Remote invocation — the beginning
Objective: To explain how to arrange the content of the remote call module, to introduce the package structure design in Dubbo-RPC-API, and the outermost source code parsing.
preface
Recently, I was faced with a choice, because Dubbo 2.7.0-release appeared in the warehouse, and I have been conducting code review of 2.7.0 recently. Then I said that this series of articles were all about the source code of 2.6.x version. Should I now choose to go straight to the 2.7.0 version of the source code? I decided to continue with 2.6.x because I felt that there were still a lot of companies using 2.6.x, and that it would not be too soon to plan for 2.7.0, and that it would be easier to understand the new features of 2.7.0 after understanding how 2.6.x worked, and to get a sense of what the designers intended. Of course, after the important module of 2.6.x, I will also do a comprehensive analysis of the new features and implementation principles of 2.7.0, as the graduation version of the Dubbo community, more powerful, stay tuned.
Hello,Dubbo Hello,Dubbo Hello,Dubbo Hello,Dubbo Hello,Dubbo Hello,Dubbo Hello,Dubbo If the application deployed on server A wants to invoke methods such as the application deployed on server B, the invocation cannot be performed because the application does not have the same memory space. The dubbo RPC remote call module abstracts various protocols, as well as the dynamic Proxy, Proxy layer and Protocol layer RPC core. I’ll talk about that in this series. Here are two images from official documents:
- Sequence diagram of exposed services:
You’ll find Transporter, Server, Registry, which we talked about earlier, and this series will look at those in the red box.
- Reference service sequence diagrams
In the reference service sequence diagram, the corresponding part is also the red box.
After reading through the series, I hope to get a feel for this call chain. Let’s look at the package structure of Dubo-RPC:
As you can see, there are many neat packages, among which Dubbo-Rpc-API is the abstraction and implementation of protocols, exposures, references, proxies, and so on, and is the core of RPC’s overall design. The other packages are the nine protocols dubbo supports, which can also be seen in the official documentation, and include a local call to InjVM. Let’s look at the package structure in Dubo-rpc-API:
- Filter package: A series of filters are performed when service references are made. It includes a lot of filters.
- Listener package: If you look at the sequence diagrams of service references and service exposures above, you can see that there are two Listeners in which the logical implementation is in this package
- Protocol package: This package implements some of the common logic of the protocol
- Proxy package: Implements the proxy logic.
- Service package: Contains encapsulating abstractions such as a method to be invoked.
- Support package: includes utility classes
- Outermost implementation.
The following space design, this article will explain the outermost source code and the source code under service, support package source code I will intersperse in other places to explain, filter, listener, protocol, proxy and various protocol implementation with one.
Source code analysis
(a) Invoker
public interface Invoker<T> extends Node {
/** * get service interface@return service interface.
*/
Class<T> getInterface(a);
/** invoke. * Invoke the next session domain *@param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
Copy the code
This interface is the entity domain, the core dubbo model to which other models converge or transform, and it represents an executable to which invoke calls can be made, either as a local implementation, a remote implementation, or a clustered implementation. It represents a call
(2) the Invocation
public interface Invocation {
/** * get method name@return method name.
* @serial* /
String getMethodName(a);
/** * get parameter types@return parameter types.
* @serial* /Class<? >[] getParameterTypes();/** * get arguments@return arguments.
* @serial* /
Object[] getArguments();
/** * attachments@return attachments.
* @serial* /
Map<String, String> getAttachments(a);
/** * get attachment by key@return attachment value.
* @serial* /
String getAttachment(String key);
/** * get attachment by key with default value@return attachment value.
* @serial* /
String getAttachment(String key, String defaultValue);
/** * get the invoker in current context@return invoker.
* @transient* /Invoker<? > getInvoker(); }Copy the code
The Invocation is the session domain, which holds variables during Invocation, such as method names, parameters, etc.
(3) of the Exporter
public interface Exporter<T> {
/** * get invoker. * Get invoker@return invoker
*/
Invoker<T> getInvoker(a);
/** * unexport. * * * getInvoker().destroy(); *
*/
void unexport(a);
}
Copy the code
This interface is the exposed service interface and defines two methods: get invoker and unexpose service.
(4) ExporterListener
@SPI
public interface ExporterListener {
/** * The exporter exporting@param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#export(Invoker)
*/
void exported(Exporter
exporter) throws RpcException;
/** * The exporter unexported@param exporter
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Exporter#unexport()
*/
void unexported(Exporter
exporter);
}
Copy the code
This interface is a listener interface for service exposure and defines two methods that are exposed and unexposed, with arguments of type Exporter.
(5) Protocol
@SPI("dubbo")
public interface Protocol {
/** * Get default port when user doesn't config the port@return default port
*/
int getDefaultPort(a);
/** * Export service for remote invocation: <br> * 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress(); <br> * 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when * export the same URL<br> * 3. Invoker instance is Passed in by the framework, protocol needs not to care < BR > * Exposes service methods, *@param<T> Service type Service type *@paramInvoker Service Entity domain of the Invoker Service *@return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/** * Refer a remote service: <br> * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object <br> * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation. <br> * 3. When there's check=false set in URL, The implementation must not throw exception but try to recover when * connection fails. * Reference service method *@param<T> Service type Service type *@paramType Service class Name of the Service class *@param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/** * Destroy protocol:
* 1. Cancel all services this protocol exports and refers
* 2. Release all occupied resources, for example: connection, port, etc.
* 3. Protocol can continue to export and refer new service even after it's destroyed. */
void destroy(a);
}
Copy the code
This interface is both a service domain interface and a protocol interface. It is an extensible interface that implements the DuBBo protocol by default. Four methods are defined, the key being service exposure and reference.
(6) of the Filter
@SPI
public interface Filter {
/**
* do invoke filter.
* <p>
* <code>
* // before filter
* Result result = invoker.invoke(invocation);
* // after filter
* return result;
* </code>
*
* @param invoker service
* @param invocation invocation.
* @return invoke result.
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result invoke(Invoker
invoker, Invocation invocation) throws RpcException;
}
Copy the code
This interface is the Invoker call-time filter interface, where there is only one Invoke method. The calls are filtered in this method
(7) InvokerListener
@SPI
public interface InvokerListener {
/** * The invoker referred to in this document *@param invoker
* @throws RpcException
* @see com.alibaba.dubbo.rpc.Protocol#refer(Class, com.alibaba.dubbo.common.URL)
*/
void referred(Invoker
invoker) throws RpcException;
/** * The Invoker destroyed@param invoker
* @see com.alibaba.dubbo.rpc.Invoker#destroy()
*/
void destroyed(Invoker
invoker);
}
Copy the code
This interface is a listener for the entity domain and defines two methods, one for service reference and the other for destruction.
(8) the Result
This interface is the result interface of the invoke execution of the entity domain, which defines methods such as obtaining the result exception and value added. I won’t post the code if it’s easier to understand.
ProxyFactory (9)
@SPI("javassist")
public interface ProxyFactory {
/** * create proxy@param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/** * create proxy@param invoker
* @return proxy
*/
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
/** * create invoker. * Create an entity domain@param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
Copy the code
Dubbo provides two dynamic proxy methods: Javassist/JDK. This interface defines three methods, the first two are to create the proxy through Invoker, and the last is to obtain the invoker through the proxy.
RpcContext (10)
This class is the context for the remote call, running through the call, for example A calling B, and then B calling C. On service B, RpcContext saves the call information from A to B before B. Start calling C and save the call information from B to C after B calls C. RpcContext holds the call information.
public class RpcContext {
/** * use internal thread local to improve performance */
private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue(a) {
return newRpcContext(); }};/**
* 服务上下文
*/
private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue(a) {
return newRpcContext(); }};/** ** value set */
private final Map<String, String> attachments = new HashMap<String, String>();
/** * Context value */
private final Map<String, Object> values = new HashMap<String, Object>();
/** * Thread result */
privateFuture<? > future;/** * url set */
private List<URL> urls;
/** * the current URL */
private URL url;
/** * Method name */
private String methodName;
/** * Parameter type set */
privateClass<? >[] parameterTypes;/** * Parameter set */
private Object[] arguments;
/** * local address */
private InetSocketAddress localAddress;
/**
* 远程地址
*/
private InetSocketAddress remoteAddress;
/** * Entity domain set */
@Deprecated
privateList<Invoker<? >> invokers;/**
* 实体域
*/
@Deprecated
privateInvoker<? > invoker;/** * Session domain */
@Deprecated
private Invocation invocation;
// now we don't use the 'values' map to hold these objects
// we want these objects to be as generic as possible
/** * request */
private Object request;
/** * response */
private Object response;
Copy the code
The most important properties of this class are some of its properties, because this context is used to hold information. I’m not going to do it because it’s easy.
(11) RpcException
/** * do not know the exception */
public static final int UNKNOWN_EXCEPTION = 0;
/** * Network exception */
public static final int NETWORK_EXCEPTION = 1;
/** * Timeout exception */
public static final int TIMEOUT_EXCEPTION = 2;
/** * Base exception */
public static final int BIZ_EXCEPTION = 3;
/** * Forbid access exception */
public static final int FORBIDDEN_EXCEPTION = 4;
/** * serialization exception */
public static final int SERIALIZATION_EXCEPTION = 5;
Copy the code
This is an exception class thrown by RPC calls that encapsulates five common error codes.
(12) RpcInvocation
/** * Method name */
private String methodName;
/** * Parameter type set */
privateClass<? >[] parameterTypes;/** * Parameter set */
private Object[] arguments;
/** * added value */
private Map<String, String> attachments;
/** * entity domain */
private transientInvoker<? > invoker;Copy the code
This class implements the Invocation interface and is the session domain of RPC. The method is simple and encapsulates the above properties.
RpcResult
/** * result */
private Object result;
/** * exception */
private Throwable exception;
/** * added value */
private Map<String, String> attachments = new HashMap<String, String>();
Copy the code
This class implements the Result interface, which is the Result implementation class of RPC. The key is to encapsulate the above three attributes.
RpcStatus
This class is a set of RPC state monitors that encapsulate a number of counters to record the state of RPC calls.
1. The attribute
/** * urIs corresponding to the state set, key is the URI, value is the RpcStatus object */
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
/** * method state set, key is uri, second key is method name */
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
/** ** is no longer useful */
private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
/**
* 活跃状态
*/
private final AtomicInteger active = new AtomicInteger();
/** * total number */
private final AtomicLong total = new AtomicLong();
/** * Number of failures */
private final AtomicInteger failed = new AtomicInteger();
/** * Total call duration */
private final AtomicLong totalElapsed = new AtomicLong();
/** * Total call failure duration */
private final AtomicLong failedElapsed = new AtomicLong();
/** * Maximum call duration */
private final AtomicLong maxElapsed = new AtomicLong();
/** * Maximum call failure duration */
private final AtomicLong failedMaxElapsed = new AtomicLong();
/** * Maximum call success duration */
private final AtomicLong succeededMaxElapsed = new AtomicLong();
/** * Semaphore used to control concurrency limits set by 'execution
private volatile Semaphore executesLimit;
/** * License used to control 'execution' Settings */
private volatile int executesPermits;
Copy the code
This is a property of this class. You can see that many counters are saved, which are used to record the cumulative number of failed calls and successful calls.
2.beginCount
/** * start counting *@param url
*/
public static void beginCount(URL url, String methodName) {
// Increment the active counter to the URL pair
beginCount(getStatus(url));
// Add one to the active counter for this method
beginCount(getStatus(url, methodName));
}
/** * atomic increment 1 *@param status
*/
private static void beginCount(RpcStatus status) {
status.active.incrementAndGet();
}
Copy the code
The method is to increment the count.
3.endCount
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
// the state in which the url corresponds decreases the counter by one
endCount(getStatus(url), elapsed, succeeded);
// The counter in the corresponding state of the method decreases by one
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
// The active counter decreases by one
status.active.decrementAndGet();
// The total counter increases by 1
status.total.incrementAndGet();
// Total call duration plus call duration
status.totalElapsed.addAndGet(elapsed);
// If the maximum call time is less than Elapsed, set the maximum call time
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
// If the RPC call succeeds
if (succeeded) {
// If elapsed is less than the maximum call success time, set the maximum call success time
if(status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); }}else {
// The failure counter increases by one
status.failed.incrementAndGet();
// The elapsed time plus Elapsed
status.failedElapsed.addAndGet(elapsed);
// If the total elapsed time is less than elapsed, set the total elapsed time
if(status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); }}}Copy the code
The method is counter decrement.
StaticContext
This class is the system context and is for internal use only.
/** * System name */
private static final String SYSTEMNAME = "system";
/** * System context collection, for internal use only */
private static final ConcurrentMap<String, StaticContext> context_map = new ConcurrentHashMap<String, StaticContext>();
/** * System context name */
private String name;
Copy the code
Above are the properties of the class, which also records all of the system context collections.
(16) EchoService
public interface EchoService {
/** * echo test@param message message.
* @return message.
*/
Object $echo(Object message);
}
Copy the code
The interface is the echo service interface, defines a echoes a test method, test of the echo service is available for detection, testing of the echo request according to normal process execution, to test the whole call whether unobstructed, can be used to monitor, all services to realize the interface automatically, just arbitrary force into the EchoService service, can use.
GenericException
This method is a generic exception class.
/** * Exception class name */
private String exceptionClass;
/** * Exception message */
private String exceptionMessage;
Copy the code
It’s relatively simple, encapsulating two attributes.
GenericService
public interface GenericService {
/** * Generic Invocation * Generic session domain *@param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
* required, e.g. findPerson(java.lang.String)
* @param parameterTypes Parameter types
* @param args Arguments
* @return invocation return value
* @throws Throwable potential exception thrown from the invocation
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
}
Copy the code
This interface is a generic service interface that also defines an Invoke-like method
Afterword.
The source code for this section is github.com/CrazyHZM/in…
Dubbo-rpc-api (dubbo-RPC-API) (dubbo-RPC-API) (Dubbo rPC-API) (Dubbo RPC-API) (Dubbo RPC-API) (Dubbo RPC-API) (Dubbo RPC-API) (Dubbo RPC-API) (Dubbo RPC-API) (Dubbo RPC-API) (Dubbo RPC-API) Many of these interface definitions need to be clarified. Next, I’ll start explaining RPC module filters.