This paper will analyze in detail the implementation mechanism of “Dubbo: Service executes=” /> and” Dubbo: Reference actives =” />, and deeply discuss the protection mechanism of Dubbo itself. ExecuteLimitFilter @activate (group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
- Filters control the concurrency of the service caller.
- Usage scenario A protection mechanism implemented by Dubbo service providers to control the maximum concurrency of each service.
- Block Condition When a service invocation exceeds the allowed concurrency, RpcException is thrown directly. Next, the source code analyzes the implementation details of the ExecuteLimitFilter. ExecuteLimitFilter#invoke
public Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); // @1 if (max > 0) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // @2 executesLimit = count.getSemaphore(max); // @3 if(executesLimit ! = null && ! (acquireResult = executesLimit.tryAcquire())) { // @4 throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } boolean isSuccess = true; try { Result result = invoker.invoke(invocation); // @5 return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { if(acquireResult) { // @6 executesLimit.release(); }}}Copy the code
Code @1: Gets the value of the parameter executes from the list of service providers, which, if less than or equal to zero, means that concurrency control is not enabled and calls are made directly along the invocation chain. Code @2: Get RpcStatus based on the service provider URL and service invocation method name.
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map == null) {
METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
RpcStatus status = map.get(methodName); /
if (status == null) {
map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
}
return status;
}
Copy the code
Here is a classic use of the concurrent container ConcurrentHashMap, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS {service provider URL unique String: {method name: RpcStatus}} instead! Code @3: Creates a semaphore object corresponding to the service method based on the maximum concurrency configured by the service provider.
public Semaphore getSemaphore(int maxThreadNum) { if(maxThreadNum <= 0) { return null; } if (executesLimit == null || executesPermits ! = maxThreadNum) { synchronized (this) { if (executesLimit == null || executesPermits ! = maxThreadNum) { executesLimit = new Semaphore(maxThreadNum); executesPermits = maxThreadNum; } } } return executesLimit; }Copy the code
Dual detection is used to create the executesLimit semaphore. Code @4: If the lock is not acquired, it does not block and wait, but directly throws a RpcException. The policy on the server side is to throw an exception quickly for the service caller (consumer) to execute according to the cluster policy, such as retry another service provider. Code @5: Perform the actual service call. Code @6: Release the semaphore after the service call, if the semaphore was successfully requested. Summary: < dubbo:service executes=” /> means the maximum concurrency for each method of each service. If this value is exceeded, RpcException is thrown directly.
ActiveLimitFilter @activate (Group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
- Filters control the concurrency of the consumer side calling the service.
- Usage Scenario Controls the concurrent invocation of a service from the same consumer to a server. This value should normally be less than < dubbo:service executes=””/>
- The blocking condition is not blocking. However, if the concurrency exceeds the allowed concurrency, the service will be blocked. After the timeout period is exceeded, the service will not be invoked.
ActiveLimitFilter#invoke ActiveLimitFilter#invoke
public Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); // @1 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); // @2 if (max > 0) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); // @3 long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); // @4 if (active >= max) { // @5 synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { // @6 throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } try { long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); // @7 try { Result result = invoker.invoke(invocation); // @8 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); // @9 return result; } catch (RuntimeException t) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if (max > 0) { synchronized (count) { count.notify(); // @10}}}}Copy the code
Code @1: Get the configured actives parameter in the message side URL from Invoker. Why is the URL retrieved from Invoker the consumption side URL? This is because when Invoker is invoked on the consumer side based on the service provider URL created, the service provider URL is used and then the consumer side configuration properties are combined with priority -d > consumer > server. RegistryDirectory#toInvokers URL = mergeUrl(providerUrl); Code @2: Get RpcStatus based on the service provider URL and invoking the service provider method. Code @3: Gets the timeout of the interface call, default is 1s. Code @4: Gets the current consumer, the degree of concurrent invocation for a particular service, a particular method, the active value. Code @5: If the current concurrent calls are greater than or equal to the maximum allowed, apply for a lock for the RpcStatus and call its wait(timeout) to wait, that is, within the timeout time of interface invocation, the RpcStatus is still not awakened, and the timeout exception will be thrown directly. Code @6: Determine whether the call was wakened because the wait timed out, or because the call ended and the “quota” was released. If it was wakened out, throw an exception. Code @7: Before a service invocation, add one to the active value of RpcStatus corresponding to the service name + method name. Code @8: Perform the RPC service call. Code @9: Logs successful or failed calls and subtracts active by one. Code @10: Finally executed successfully, wakes up the wait if actives is enabled (Dubbo: Referecnce Actives =””). Summary: < dubbo: Reference Actives =””/> controls the maximum concurrency allowed for a single service invocation on the consumer side to a single service provider. This value should not be greater than the value of < dubbo:service executes=””/> and is not recommended if the consumer machine is configured with different properties.
Welcome to add the author micro signal (DINGwPMZ), add group discussion, the author quality column catalog: 1, source analysis RocketMQ column (40 +) 2, source analysis Sentinel column (12 +) 3, source analysis Dubbo column (28 +) 4, source analysis Mybatis column 5, source analysis Netty column (18 +) 6, source analysis JUC column Source code analysis (MyCat) for Elasticjob