Dubbo performance tuning parameters and principles
- 1 Dubbo call model
- 2 Common performance tuning parameters
- 3 source code and principle analysis
- 3.1 the threads
- 3.2 iothreads
- 3.3 the queues
- 3.4 connections
- 3.5 actives
- 3.6 accepts
- 3.7 executes
1 Dubbo call model
2 Common performance tuning parameters
Parameter names | scope | The default value | instructions | note |
---|---|---|---|---|
threads | provider | 200 | Business processing thread pool size | |
iothreads | provider | The number of CPU + 1 | I/O thread pool size | |
queues | provider | 0 | Thread pool queue size, when the thread pool is full, queue queue size waiting for execution, it is recommended not to set, when the thread pool should fail immediately, retry other service provider machines, rather than queue, unless there is a special requirement. | |
connections | consumer | 0 | For the maximum number of connections per provider, short connection protocols such as RMI, HTTP, and Hessian represent the number of limited connections, while Dubbo represents the number of established long connections | The Dubbo protocol shares a long connection by default |
actives | consumer | 0 | Maximum number of concurrent calls per service consumer per service per method | 0 indicates no limit |
accepts | provider | 0 | Maximum number of connections acceptable to the service provider | 0 indicates no limit |
executes | provider | 0 | The maximum number of requests that a service provider can execute in parallel per service per method | 0 indicates no limit |
3 source code and principle analysis
3.1 the threads
FixedThreadPool.java
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>() :
new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
Copy the code
LimitedThreadPool.java
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>() :
new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
Copy the code
Constants.DEFAULT_QUEUES = 200. The Threads parameter configures the maximum (or core) number of threads in the business processing thread pool.
3.2 iothreads
NettyServer.java
Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; }}); // bind channel = bootstrap.bind(getBindAddress()); }Copy the code
3.3 the queues
Java, LimitedThreadPool. Java, and CachedThreadPool. Java, respectively. See Section 3.2 for code details. As can be seen from the code, the default value is 0, indicating the use of synchronous blocking queues; If queues are set to a value less than 0, use blocking linked list queues with capacity integer.max_value. If it is any other value, the blocking list queue of the specified size is used.
3.4 connections
DubboProtocol.java
Private ExchangeClient[] getClients(URL URL){// Whether to share the connection Boolean Service_SHARE_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); If (connections == 0){service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }Copy the code
DubboInvoker.java
Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); }}Copy the code
The default value is 0, indicating that all clients share one long connection for each Provider. Otherwise, a specified number of long connections are established. When called, if there are more than one long connection, polling is used to obtain one long connection.
3.5 actives
ActiveLimitFilter.java
public Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); if (active >= max) { synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } try { long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } catch (RuntimeException t) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if(max>0){ synchronized (count) { count.notify(); }}}}Copy the code
When a Consumer call is made, the service and method dimensions are counted, and if the number of concurrent calls exceeds the set maximum, the current thread is blocked until the request is processed.
3.6 accepts
AbstractServer.java
@Override
public void connected(Channel ch) throws RemotingException {
Collection<Channel> channels = getChannels();
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
super.connected(ch);
}
Copy the code
When the number of connections exceeds the maximum value, the current connection is closed.
3.7 executes
ExecuteLimitFilter.jvava
public Result invokeOrg(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
if (count.getActive() >= max) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isException = false;
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isException = true;
if(t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
finally {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);
}
}
Copy the code
When the Provider processes requests, it collects statistics on method dimension invocations. If the number of concurrent requests exceeds the maximum value, the Provider does not throw exceptions directly.