At the beginning
In the last section, we talked about service export, that is, how the server provides its interface as a dubbo service. This section is about service invocation. How does the consumer invoke the interface of the server?
The main process
1. When spring starts, the attribute of the @Reference annotation will be assigned a value, and the referenceBean.get method will be called when the value is assigned
2. Prepare to initialize the Invoker object,MockClusterInvoker, which is the end goal
3. Initialize the service directory RegistryDirectory in the registry
4. Register consumer information with zK
5. Construct routing chains and service subscriptions
6. Obtain the final Invoker object MockClusterInvoker based on the service catalog
7. The final call MockClusterInvoker. Invoke method execution request to send data, it calls the netty. The send method
8. Run the Netty ServerHandler method to process the request and return the result through the Netty Channel
Source process
Flowchart address:www.processon.com/view/link/6…
1. Program entry
When spring start to @ Reference annotation attribute assignment, generate ReferenceBean, in ReferenceAnnotationBeanPostProcessor. DoGetInjectedBean methods as you can see, The referenceBean.get() method is finally called, which returns a Ref object that is seen as an Invoke proxy object, the second step of the main process, ready to initialize the Invoker object,MockClusterInvoker, Generating this is the end goal
@Override protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<? > injectedType, InjectionMetadata.InjectedElement injectedElement) throws Exception { return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType); } private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<? > serviceInterfaceType) { if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType}, wrapInvocationHandler(referenceBeanName, referenceBean)); } else {// ReferenceBean should be initialized and get immediately; } } public synchronized T get() { checkAndUpdateSubConfigs(); if (destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!" ); } if (ref == null) {// init(); } return ref; // Invoke proxy}Copy the code
2. Prepare to initialize the Invoker object MockClusterInvoker
Init ()->createProxy(map), this method is too long, leaving three main methods: 1. Invoker = ref_protocol. refer Calls registry.refer, which is the SPI mechanism, and finally calls registryprotocol. refer
private T createProxy(Map<String, String> map) {
List<URL> us = loadRegistries(false);
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
}
Copy the code
3. Initialize the service directory RegistryDirectory in the registry, leaving the main code, and you can see that this initializes a RegistryDirectory, which is the consumers node folder that we eventually see on zk. registry.register(directory.getRegisteredConsumerUrl()); Here will call ZookeeperRegistry doRegister method, using zk client to zk server create nodes, the consumption side information registration to zk, you can see here is temporary node created
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); registry.register(directory.getRegisteredConsumerUrl()); directory.buildRouterChain(subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, return invoker; } @Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); }}Copy the code
4. Routing chain structure, service subscription directory. BuildRouterChain (subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY
3. Generate the final Invoker object MockClusterInvoker
Invoker invoker = cluster.join(directory);
Copy the code
This is the SPI mechanism again. Since Cluster has a wrapper class, the MockClusterWrapper. Join method is called first
public class MockClusterWrapper implements Cluster { private Cluster cluster; public MockClusterWrapper(Cluster cluster) { this.cluster = cluster; } @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }}Copy the code
4. Service invocation
Generated in step 3 a MockClusterInvoker object, so in the end to invoke the service method is actually called MockClusterInvoker. The invoke method, Will in turn invoke AbstractClusterInvoker. Invoke – > FailoverClusterInvoker. DoInvoke – > DubboInvoker. DoInvoke
@Override public Result invoke(Invocation invocation) throws RpcException { Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || "false".equalsIgnoreCase(value)) { //no mock result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); //fix:#4585 if(result.getException() ! = null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if(rpcException.isBiz()){ throw rpcException; }else { result = doMockInvoke(invocation, rpcException); } } } return result; }Copy the code
Let’s look directly at the DubBoInvoker.doInvoke method 1. First get a ExchangeClient client 2. Asynchronous request currentClient. Request, eventually call HeaderExchangeChannel. Call the netty method request – > channel. Send
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); asyncRpcResult.subscribeTo(responseFuture); return asyncRpcResult; }}}Copy the code
5. Service request processing
Due to the use of netty communication, after all the client sends the message, netty server will be NettyServerHandler. ChannelRead received messages, call here a lot of handler, will not spread. 1.MultiMessageHandler 2.HeartbeatHandler 3.AllChannelHandler 4.DecodeHandler 5.HeaderExchangeHandler 6.ExchangeHandlerAdapter
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); }}Copy the code
conclusion
The purpose of introducing the service is to annotate a server interface on the consumer side with @Reference. This annotation will register the consumer side message with ZK, and eventually generate a proxy object invoker that calls the server side interface. When the consumer invokes the server side interface, the invoker. The communication framework adopted by this method is NetTY, which realizes the remote call. Dubbo source code is well written, such as the SPI mechanism inside the use of very clever, there are some abstract factory design patterns, etc., the source code is worth reading.