Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my wechat “JAVA_front” to communicate and learn together
0 Article Overview
You may have encountered the problem of DUBBO thread pool is full. At the beginning of the problem, you may feel panic. The common solution may be to restart the service, but you do not know whether restart can solve the problem. I don’t think a reboot will solve the problem, it may even exacerbate it. Why? In this article, we will analyze the DUBBO thread pool full problem.
1 Basic Knowledge
1.1 DUBBO thread model
1.1.1 Basic Concepts
The underlying network communication of DUBBO adopts Netty framework. We write a Netty server to observe:
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(newNettyServerHandler()); }}); ChannelFuture channelFuture = bootstrap.bind(7777).sync();
System.out.println("Server ready");
channelFuture.channel().closeFuture().sync();
} catch (Exception ex) {
System.out.println(ex.getMessage());
} finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code
The BossGroup thread group has only one thread to process the client connection request. Once the connection is complete, the SocketChannel connection that completes the three-way handshake is distributed to the WorkerGroup to process the read and write requests. These two thread groups are called “IO threads”.
Let’s introduce the concept of “business threads.” Once a service producer receives a request, if the processing logic can be processed quickly, it can be processed directly by the IO thread, reducing thread pool scheduling and context switching. However, if the processing logic is time-consuming, or new IO requests such as database queries are made, they must be dispatched to the business thread pool for processing.
DUBBO provides a variety of thread models. Selecting a thread model requires specifying the Dispatcher attribute in the configuration file:
<dubbo:protocol name="dubbo" dispatcher="all" />
<dubbo:protocol name="dubbo" dispatcher="direct" />
<dubbo:protocol name="dubbo" dispatcher="message" />
<dubbo:protocol name="dubbo" dispatcher="execution" />
<dubbo:protocol name="dubbo" dispatcher="connection" />
Copy the code
DUBBO’s official documentation explains whether to use IO threads or business threads for different threading models:
All All messages are sent to the service thread pool, including requests, responses, connection events, disconnection events, and heartbeat events. Direct None of the messages are sent to the service thread pool. All the messages are executed directly on the I/O thread. Only the request message is sent to the service thread pool, and the response and other disconnection events are executed directly on the I/O thread. Connection The I/O thread queues the disconnection events and executes them one by one. Other messages are sent to the service thread poolCopy the code
1.1.2 Timing
Producers and consumers determine the threading model during initialization:
/ / producer
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }}/ / consumer
public class NettyClient extends AbstractClient {
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler)); }}Copy the code
The producer and consumer default thread model is AllDispatcher, and the ChannelHandlers. Wrap method gets the Dispatch adaptive extension point. If we specify dispatcher in the configuration file, the extension point loader takes the property value from the URL and loads the corresponding thread model. This paper takes the producer as an example for analysis:
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// channelhandlers. wrap Determines the thread policy
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }}public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(newHeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); }}@SPI(AllDispatcher.NAME)
public interface Dispatcher {
@Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})
ChannelHandler dispatch(ChannelHandler handler, URL url);
}
Copy the code
1.1.3 Source code analysis
We analyzed the source code for two of the thread models, and read the DUBBO source code for the other thread models. AllDispatcher model all messages are sent to the business thread pool, including request, response, connection event, disconnect event, heartbeat:
public class AllDispatcher implements Dispatcher {
// The thread model name
public static final String NAME = "all";
// Specify the implementation strategy
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return newAllChannelHandler(handler, url); }}public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void connected(Channel channel) throws RemotingException {
// The connection completion event is handed to the business thread pool
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t); }}@Override
public void disconnected(Channel channel) throws RemotingException {
// Disconnect events are handed to the business thread pool
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t); }}@Override
public void received(Channel channel, Object message) throws RemotingException {
// Request response events are handed to the business thread pool
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException) {
Request request = (Request)message;
if(request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return; }}throw new ExecutionException(message, channel, getClass() + " error when process received event", t); }}@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
// Exception events are passed to the business thread pool
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t); }}}Copy the code
DirectDispatcher does not send any messages to the service thread pool. All messages are executed on the I/O thread:
public class DirectDispatcher implements Dispatcher {
// The thread model name
public static final String NAME = "direct";
// Specify the implementation strategy
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
// Return to handler to indicate that all events are handled by the IO thread
returnhandler; }}Copy the code
1.2 DUBBO Thread pool policy
1.2.1 Basic Concepts
The previous section examined the threading model, and we know that different threading models choose to use IO threads or business threads. If business thread pools are used, what thread pool policy to use is the question that needs to be answered in this section. DUBBO thread dispatch model diagram shows the relationship between thread model and thread pool policy:
DUBBO provides several threadpool policies. Selecting a threadpool policy requires specifying the threadpool attribute in the configuration file:
<dubbo:protocol name="dubbo" threadpool="fixed" threads="100" />
<dubbo:protocol name="dubbo" threadpool="cached" threads="100" />
<dubbo:protocol name="dubbo" threadpool="limited" threads="100" />
<dubbo:protocol name="dubbo" threadpool="eager" threads="100" />
Copy the code
Different thread pool policies create thread pools with different characteristics:
Fixed Contains a fixed number of threads. The cached thread is recycled when idle for a minute, and new threads are created when new requests arrive. Limited The number of threads increases with the number of tasks, but does not exceed the maximum threshold. If all the core threads in the eager state are busy, new threads are created to execute tasks preferentially instead of being queued immediatelyCopy the code
1.2.2 Determine the timing
This paper takes AllDispatcher as an example to analyze the timing of thread pool policy determination:
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return newAllChannelHandler(handler, url); }}public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url); }}Copy the code
In the WrappedChannelHandler constructor, if you specify a threadpool attribute, the extension point loader will fetch the attribute value from the URL to load the corresponding threadpool policy. The default policy is fixed:
public class WrappedChannelHandler implements ChannelHandlerDelegate {
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// Get the thread pool adaptive extension point
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if(Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }}@SPI("fixed")
public interface ThreadPool {
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}
Copy the code
1.2.3 Source code analysis
(1) FixedThreadPool
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// The thread name
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// The default number of threads is 200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// The queue capacity defaults to 0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
SynchronousQueue is used when queue capacity equals 0
// If the queue size is less than 0, use the unbounded queue LinkedBlockingQueue
// Queue size greater than 0 uses the bounded blocking queue LinkedBlockingQueue
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), newAbortPolicyWithReport(name, url)); }}Copy the code
(2) CachedThreadPool
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// Get the thread name
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// The number of core threads defaults to 0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// Maximum number of threads Default to the maximum value Int
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// The queue capacity defaults to 0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// How much time the thread is idle is reclaimed by default 1 minute
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
SynchronousQueue is used when queue capacity equals 0
// If the queue size is less than 0, use the unbounded queue LinkedBlockingQueue
// Queue size greater than 0 uses the bounded blocking queue LinkedBlockingQueue
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), newAbortPolicyWithReport(name, url)); }}Copy the code
(3) LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// Get the thread name
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// The number of core threads defaults to 0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// The default maximum number of threads is 200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// The queue capacity defaults to 0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
SynchronousQueue is used when queue capacity equals 0
// If the queue size is less than 0, use the unbounded queue LinkedBlockingQueue
// Queue size greater than 0 uses the bounded blocking queue LinkedBlockingQueue
// Keepalive time Long.MAX_VALUE indicates that idle threads are not reclaimed
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), newAbortPolicyWithReport(name, url)); }}Copy the code
(4) EagerThreadPool
We know that ThreadPoolExecutor is a normal thread executor. When the core thread of the thread pool reaches the threshold, new tasks are put into the queue. When the queue is full and new threads are processed, the reject policy is executed when the current number of threads reaches the maximum number of threads.
However, EagerThreadPool custom thread execution policy, when the thread pool core thread reaches the threshold, new tasks will not be queued, but new threads will be started for processing (required that the current number of threads does not exceed the maximum number of threads). The task is queued when the current number of threads reaches the maximum.
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
/ / thread
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// The number of core threads defaults to 0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// Maximum number of threads Default to the maximum value Int
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// The queue capacity defaults to 0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// How much time the thread is idle is reclaimed by default 1 minute
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// Initialize custom thread pool and queue rewrite related methods
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
returnexecutor; }}Copy the code
1.3 A Formula
We know that DUBBO chooses a thread pool strategy for business processing, so how do we estimate the number of threads that are likely to be generated? Firstly, we analyze a problem: there are 7,200 employees in a company who clock in from 8 am to 8:30 am every day, and the system takes 5 seconds to clock in each time. What are RT, QPS and concurrency?
RT stands for response time, and the question already tells us:
RT = 5
QPS represents the query volume per second, assuming that the check-in behavior is evenly distributed:
QPS = 7200 / (30 * 60) = 4
Concurrency represents the number of simultaneous requests processed by the system:
Concurrency = QPS x RT = 4 x 5 = 20
According to the above examples, the following formula is derived:
Concurrency = QPS x RT
If the system allocates one processing thread per request, the concurrency can be approximately equal to the number of threads. Based on the above formula, it is not difficult to see that concurrency is affected by QPS and RT, and an increase in either of these two indicators will lead to an increase in concurrency.
However, this is an ideal situation, because the amount of concurrency is limited by system capacity and cannot continue to increase. For example, the DUBBO thread pool has a limit on the number of threads. Exceeding the maximum number of threads will result in a rejection policy, which will indicate that the thread pool is full. The following two reasons for the rise of RT and QPS are analyzed respectively.
2 RT up
2.1 Producer slow service occurs
2.1.1 Cause analysis
(1) Producer allocation
<beans>
<dubbo:registry address=Zookeeper: / / "127.0.0.1:2181" />
<dubbo:protocol name="dubbo" port="9999" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.HelloService" ref="helloService" />
</beans>
Copy the code
(2) Producer business
package com.java.front.dubbo.demo.provider;
public interface HelloService {
public String sayHello(String name) throws Exception;
}
public class HelloServiceImpl implements HelloService {
public String sayHello(String name) throws Exception {
String result = "hello[" + name + "]";
// Simulate slow service
Thread.sleep(10000L);
System.out.println("Producer performance Results" + result);
returnresult; }}Copy the code
(3) Consumer configuration
<beans>
<dubbo:registry address=Zookeeper: / / "127.0.0.1:2181" />
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" />
</beans>
Copy the code
(4) Consumer business
public class Consumer {
@Test
public void testThread(a) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:METAINF/spring/dubbo-consumer.xml" });
context.start();
for (int i = 0; i < 500; i++) {
new Thread(new Runnable() {
@Override
public void run(a) {
HelloService helloService = (HelloService) context.getBean("helloService");
String result;
try {
result = helloService.sayHello("Wechat official account" JAVA Front");
System.out.println("Client receives result" + result);
} catch(Exception e) { System.out.println(e.getMessage()); } } }).start(); }}}Copy the code
When you run the producer and consumer code in sequence, an error message appears in the log. The producer log prints that the thread pool is full:
Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 201 (completed: 1), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999!
at org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:67)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:88)
... 25 more
Copy the code
The consumer log not only prints that the thread pool is full, but also prints service provider information and invoked methods, and we can use the log to find out which methods are problematic:
Failed to invoke the method sayHello in the service com.java.front.dubbo.demo.provider.HelloService. Tried 3 times of Providers [X.X.X.X :9999] (1/1) from the registry 127.0.0.1:2181 on the consumer X.X.X.X using the dubbo version 2.7.0-snapshot. Last error is: Failed to invoke remote method: sayHello, provider: dubbo://x.x.x.x:9999/com.java.front.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer1&check=false& Dubbo = 2.0.2 & generic = false&group = & interface = com, Java. Front. The dubbo. Demo. The provider. HelloService&logger = log4j & the methods = sayHello & pid = 33432 & register. IP = X.X.X.X & release = 2.7.0 - SNAPSHOT&remote. Application = XPZ - provider&remote. Timestamp = 1618632597509 & sid e=consumer&timeout=100000000×tamp=1618632617392, cause: Server side(x.x.x.x,9999) threadpool is exhausted ,detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-x.x.x.x:9999, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 401 (completed: 201), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://x.x.x.x:9999!Copy the code
2.1.2 Solution
(1) Identify slow services
When the DUBBO thread pool is full, the reject policy is executed:
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
// Prints a thread snapshot
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack(a) {
long now = System.currentTimeMillis();
// Output thread snapshot every 10 minutes
if (now - lastPrintTime < 10 * 60 * 1000) {
return;
}
if(! guard.tryAcquire()) {return;
}
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
System.out.println("AbortPolicyWithReport dumpJStack directory=" + dumpPath);
SimpleDateFormat sdf;
String os = System.getProperty("os.name").toLowerCase();
// Linux file location /home/xxx/dubbo_jstack.log.2021-01-01 _20:50:15
// Windows file location /user/ XXX/dubbo_jstack.log. 2020-01-01_20-50-15
if (os.contains("win")) {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
} else {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
}
String dateStr = sdf.format(new Date());
try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
} finally{ guard.release(); } lastPrintTime = System.currentTimeMillis(); }); pool.shutdown(); }}Copy the code
The rejection policy outputs thread snapshot files to protect the field, and BLOCKED and TIMED_WAITING thread states need to be considered when analyzing thread snapshot files. If you find a large number of threads blocking or waiting states, you can locate specific lines of code:
DubboServerHandler-x.x.x.x:9999-thread-200 Id=230 TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.java.front.dubbo.demo.provider.HelloServiceImpl.sayHello(HelloServiceImpl.java:13)
at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java)
at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:56)
at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:85)
at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56)
at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
Copy the code
(2) Optimize slow service
Now that we’ve found the slow service, it’s time to optimize the slow service. Optimization of slow service needs specific analysis, which is not the focus of this article will not be expanded here.
2.2 Insufficient preheating of producers
2.2.1 Cause analysis
There is also a case where RT rises that we cannot ignore, where the provider is called after being restarted and not sufficiently warmed up. Because producers need to warm up when they start up, they need to establish connections with other resources such as databases, caches, etc., and establishing connections takes time. If a large number of consumers request to unwarmed producers at this time, link time increases the connection time, RT time will inevitably increase, which will also lead to the DUBBO thread pool full problem.
2.2.2 Solution
(1) Wait for the producers to be fully preheated
The thread pool fills up because of insufficient producer warm-up, most likely at system release time. For example, if you publish a machine and find that the thread pool is full, do not restart the machine, but give the machine some time to warm up, and the problem will disappear after the connection is established. At the same time, we also need to release in multiple batches, so as not to release too many services at one time to cause a large area of impact due to warm-up problems.
(2) The upgrade version of DUBBO is greater than or equal to 2.7.4
DUBBO consumers already have a warm-up mechanism when they are called. Why is there insufficient warm-up? This is because there were issues with pre-2.5.5 and 2.7.2 warm-up mechanisms. In short, getting the boot time incorrectly caused warm-up failures. 2.7.4 fixes this completely, so we should avoid using the problem version. Here’s a look at how the warm-up mechanism works:
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
/ / number of invokers
int length = invokers.size();
// The weights are the same
boolean sameWeight = true;
// Invokers weight array
int[] weights = new int[length];
// The first invoker weight
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// Sum of weight values
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
// Calculate the weight value
int weight = getWeight(invokers.get(i), invocation);
weights[i] = weight;
totalWeight += weight;
// If any invoker weight value is not equal to the first invoker weight value, sameWeight is set to FALSE
if(sameWeight && weight ! = firstWeight) { sameWeight =false; }}// If the weight value is not equal, calculate according to the total weight value
if (totalWeight > 0 && !sameWeight) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// continuously subtract the weight value when less than 0 directly return
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
returninvokers.get(i); }}}// If all service weights are consistent, the value will be returned randomly
returninvokers.get(ThreadLocalRandom.current().nextInt(length)); }}public abstract class AbstractLoadBalance implements LoadBalance {
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// uptime/(warmup*weight)
// If the current service provider has not passed the warmup period, the weight set by the user will be reduced by uptime/warmup
// Recalculation weights will be small if the service provider sets the weight to be high but the warm-up time has not passed
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
protected int getWeight(Invoker
invoker, Invocation invocation) {
// Get invoker set weight value default weight =100
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
// If the weight is greater than 0
if (weight > 0) {
// The service provider publishes the service timestamp
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// How long has the service been published
int uptime = (int) (System.currentTimeMillis() - timestamp);
// The preheating time is 10 minutes by default
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// The producer publish time is greater than 0 but less than the preheat time
if (uptime > 0 && uptime < warmup) {
// recalculate the weight valuesweight = calculateWarmupWeight(uptime, warmup, weight); }}}// If the service publication time is greater than the preheating time, the weight value is returned
return weight >= 0 ? weight : 0; }}Copy the code
3 QPS rise
The previous section discussed at length the problem of thread pool filling due to the rise of RT. Now let’s discuss another parameter, QPS. When upstream traffic surges, a large number of thread pools will be created and the thread pool will be full. If the QPS is found to be beyond the system’s capacity, we have to protect the system with a degraded solution. Please refer to my previous article “High Availability of Technical Systems from the Perspective of Anti-Vulnerability”.
4 Article Summary
This paper first introduces DUBBO thread model and thread pool strategy, then we draw out the formula, we find that the concurrency is affected by RT and QPS two parameters, the increase of either of these two parameters can cause the thread pool full problem. The slow service or insufficient preheating of the producer may lead to the rise of RT, while the surge of upstream traffic will lead to the rise of QPS. Meanwhile, solutions are also provided in this paper. DUBBO thread pool full is a must pay attention to the problem, I hope this article will help you.
Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, at the same time, welcome to add my wechat “JAVA_front” to communicate and learn together