sequence
This article focuses on Dubbo’s ExecuteLimitFilter
ExecuteLimitFilter
Dubbo – 2.7.2 / dubbo – RPC/dubbo xml-rpc API/SRC/main/Java/org/apache/dubbo/RPC/filter/ExecuteLimitFilter Java
public class ExecuteLimitFilter extends ListenableFilter {
private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";
public ExecuteLimitFilter() { super.listener = new ExecuteLimitListener(); } @Override public Result invoke(Invoker<? > invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);if(! RpcStatus.beginCount(url, methodName, max)) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } } static class ExecuteLimitListener implements Listener { @Override public void onResponse(Result appResponse, Invoker<? > invoker, Invocation invocation) { RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation),true); } @Override public void onError(Throwable t, Invoker<? > invoker, Invocation invocation) { RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation),false);
}
private long getElapsed(Invocation invocation) {
String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);
returnStringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0; }}}Copy the code
- ExecuteLimitFilter inherits from ListenableFilter, whose constructor initializes the Listener as ExecuteLimitListener
- The invoke method first calls rpcStatus. beginCount to see if it can pass. If it does not pass, RpcException is thrown. The onResponse or onError methods of the Listener are called back at the end of execution
- The onResponse and onError methods of the ExecuteLimitListener both call rpcStatus.endcount; In this method, the execugtelimit_filter_start_time value will be retrieved through the getelsed method and the execution time will be calculated
RpcStatus
Dubbo – 2.7.2 / dubbo – RPC/dubbo xml-rpc API/SRC/main/Java/org/apache/dubbo/RPC/RpcStatus. Java
public class RpcStatus { private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>(); private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>(); private final AtomicInteger active = new AtomicInteger(); private final AtomicLong total = new AtomicLong(); private final AtomicInteger failed = new AtomicInteger(); private final AtomicLong totalElapsed = new AtomicLong(); private final AtomicLong failedElapsed = new AtomicLong(); private final AtomicLong maxElapsed = new AtomicLong(); private final AtomicLong failedMaxElapsed = new AtomicLong(); private final AtomicLong succeededMaxElapsed = new AtomicLong(); / /... public static void beginCount(URL url, String methodName) { beginCount(url, methodName, Integer.MAX_VALUE); } /** * @param url */ public static boolean beginCount(URL url, String methodName, int max) { max = (max <= 0) ? Integer.MAX_VALUE : max; RpcStatus appStatus = getStatus(url); RpcStatus methodStatus = getStatus(url, methodName);if (methodStatus.active.incrementAndGet() > max) {
methodStatus.active.decrementAndGet();
return false;
} else {
appStatus.active.incrementAndGet();
return true;
}
}
/**
* @param url
* @param elapsed
* @param succeeded
*/
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
endCount(getStatus(url), elapsed, succeeded);
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
status.active.decrementAndGet();
status.total.incrementAndGet();
status.totalElapsed.addAndGet(elapsed);
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
if (succeeded) {
if(status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); }}else {
status.failed.incrementAndGet();
status.failedElapsed.addAndGet(elapsed);
if(status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); }}} / /... }Copy the code
- The beginCount method of RpcStatus increments methodStatus.active and determines if it is greater than Max. If it is, it returns false and decrements methodStatus.active; If the value is less than or equal to appStatus.active; The endCount method decrements status.active and increments status.total, Then according to the success of updating the status. SucceededMaxElapsed or status. The failed, status. FailedElapsed, status. FailedMaxElapsed
summary
- ExecuteLimitFilter inherits from ListenableFilter, whose constructor initializes the Listener as ExecuteLimitListener
- The ExecuteLimitFilter invoke method first calls the rpcStatus. beginCount method to determine whether it can pass. If it does not pass, an RpcException is thrown. The onResponse or onError methods of the Listener are called back at the end of execution
- The onResponse and onError methods of the ExecuteLimitListener both call rpcStatus.endcount; In this method, the execugtelimit_filter_start_time value will be retrieved through the getelsed method and the execution time will be calculated
doc
- ExecuteLimitFilter