Introduction: If a programmer can clearly understand the elements of RPC framework, master the technologies involved in RPC framework such as service registration discovery, load balancing, serialization protocol, RPC communication protocol, Socket communication, asynchronous call, fuse degradation, etc., it can comprehensively improve the basic quality. Although there are relevant source code, but only look at the source code is easy to overreach, start to write one is really master the best path of this technology.

The author | | line source foothill ali technology to the public

preface

Why do we need to write an RPC framework by ourselves? In my opinion, from the perspective of personal growth, if a programmer can clearly understand the elements of RPC framework and master the technologies involved in RPC framework, such as service registration discovery, load balancing, serialization protocol, RPC communication protocol, Socket communication, asynchronous call, fuse degradation and so on, Can promote the basic quality comprehensively. Although there are relevant source code, but only look at the source code is easy to overreach, start to write one is really master the best path of this technology.

What is RPC

Remote Procedure Call (RPC) is simply a Call to a Remote service as if it were a local method. At present, gRPC, Dubbo and Spring Cloud are widely used by the outside world. I believe you are already familiar with the concept of RPC, so I will not introduce it too much here.

Distributed RPC framework elements

A distributed RPC framework is inseparable from three basic elements:

  • The service Provider is Serivce Provider
  • Service Consumer Servce Consumer
  • Registery

Service routing, load balancing, service fusing degradation, serialization protocol, communication protocol, and so on can be further extended around the above three basic elements.

1 Registry

It is used to complete service registration and discovery. Although service invocation is made directly by the service consumer to the service provider, the service is now clustered and the number of service providers changes dynamically, so the address of the service cannot be determined in advance. So how to discover these services requires a unified registry to host them.

2 Service Provider (RPC server)

It needs to provide an external service interface. It needs to connect to the registry when the application starts and send the service name and its service metadata to the registry. A mechanism for services to go offline is also required. Service name and real service address mappings need to be maintained. The server also needs to start the Socket service to listen for client requests.

3 Service consumer (RPC client)

The client needs to have the basic ability to obtain services from the registry. When the application starts, it needs to scan the dependent RPC services and generate proxy call objects for them. At the same time, it needs to pull the service metadata from the registry and store it in the local cache, and then initiate to monitor the changes of each service to update the cache in time. When invoking the service, the object is invoked by proxy, the service address list is obtained from the local cache, and then a load balancing strategy is selected to select a target address to initiate the call. The call serializes the request data and uses a common communication protocol for socket communication.

Three Technology selection

1 Registry

Currently mature registries are Zookeeper, Nacos, Consul, Eureka, and their main comparisons are as follows:

Two registries, Nacos and Zookeeper, are supported in this implementation and can be switched according to configuration.

2 I/O communication framework

This implementation uses Netty as the underlying communication framework, Netty is a high-performance event-driven non-blocking IO(NIO) framework.

3 Communication Protocol

During TCP communication, packets are divided according to the actual situation of the TCP buffer. Therefore, a complete TCP packet may be divided into multiple packets for sending, or multiple small packets may be encapsulated into one large packet for sending. This is the so-called TCP sticky packet and unpacking problem. Therefore, it is necessary to encapsulate the sent packets into a communication protocol.

The major protocol solutions in the industry can be summarized as follows:

  1. The length of a message is fixed. For example, each packet is a fixed length of 100 bytes.
  2. The special end character at the end of the packet is used for segmentation.
  3. A message is divided into a header and a body, and the header contains fields that represent the total length of the message (or the body length of the message).

Obviously, both 1 and 2 have some limitations. This implementation adopts scheme 3, and the specific protocol design is as follows:

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-- ------+--------+--------+ | BYTE | | | | | | | ........ +--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+-- ------+-----------------+ | magic | version| type | content lenth | content byte[] | | +--------+-----------------------------------------------------------------------------------------+-------------------- ------------------------+Copy the code
  • The first byte is the magic number, which I define as 0X35.
  • The second byte represents the protocol version number so that the protocol can be extended to use a different protocol parser.
  • The third byte is the request type, such as 0 for request and 1 for response.
  • The fourth byte represents the length of the message, that is, the length following the four bytes is the message content.

4 Serialization Protocol

This implementation supports three serialization protocols. JavaSerializer, Protobuf, and Hessian can be flexibly selected based on configuration. Protobuf is recommended for its small serialized code stream and high performance, which is very suitable for RPC calls. Google’s OWN gRPC also uses it as a communication protocol.

5 Load Balancing

This implementation supports two main load balancing strategies, random and polling, both of which support random and polling with weights, namely four strategies.

Iv Overall Structure

Five implementation

Overall project structure:

1 Service registration discovery

Zookeeper

Zookeeper uses a node tree data model, similar to the Linux file system. /, /node1 and /node2 are relatively simple.

Zookeeper node type is the core principle of Zookeeper to implement many functions. It is divided into three types: persistent node, temporary node, and sequential node.

We used to create a persistent node for each service name. When the service was registered, a temporary node was actually created under the persistent node in ZooKeeper, which stored the IP address, port, serialization mode of the service, etc.

When obtaining the service, the client resolves the service address data by obtaining the temporary node list under the persistent node:

The client listens for service changes:

Nacos

Nacos is an open source micro-service management middleware of Alibaba, which is used to complete registration discovery and configuration center among services, equivalent to Eureka+Config of Spring Cloud.

Unlike Zookeeper, which requires the creation of nodes to implement registration discovery, Nacos specifically provides registration discovery, making it easier to use. NamingService interface provides three methods registerInstance, getAllInstances, subscribe; RegisterInstance is used to complete the server service registration, getAllInstances is used to complete the client service acquisition, SUBSCRIBE is used to complete the client service change monitoring, here is not much introduction, specific can refer to the source code.

2 Service Provider Serivce Provider

Initialize the registry and RPC bootstarter in the OrcRpcAutoConfiguration class:

The startup process of the server is as follows:

RPC Bootstarter:

Since Spring contains multiple containers, such as the Web container and the core container, and they have a parent-child relationship, only the top-level container can be handled to avoid repeated registrations.

3. Service Consumer

The service consumer needs to create proxy objects for the dependent services before the application is started. There are many ways to do this, two of which are common:

  • The first one is triggered when the Spring Context initialization completion event is applied, scanning all beans, obtaining the field with the OrcRpcConsumer annotation in the Bean, and then creating a proxy object of type field. After creation, the proxy object is set to the field. This proxy object is then used to create a server-side connection and initiate the call.
  • Second, Spring’s BeanFactoryPostProcessor can process bean definition BeanDefinition(configuration metadata). Spring IOC runs the BeanFactoryPostProcessor to read BeanDefinitions before the container instantiates any other beans. You can modify these BeanDefinitions or add new Ones.

This implementation also adopts the second way, the processing process is as follows:

The main implementations of BeanFactoryPostProcessor are:

@Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; postProcessRpcConsumerBeanFactory(beanFactory, (BeanDefinitionRegistry)beanFactory); } private void postProcessRpcConsumerBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry beanDefinitionRegistry) { String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames(); int len = beanDefinitionNames.length; for (int i = 0; i < len; i++) { String beanDefinitionName = beanDefinitionNames[i]; BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName); String beanClassName = beanDefinition.getBeanClassName(); if (beanClassName ! = null) { Class<? > clazz = ClassUtils.resolveClassName(beanClassName, classLoader); ReflectionUtils.doWithFields(clazz, new FieldCallback() { @Override public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { parseField(field); }}); } } Iterator<Entry<String, BeanDefinition>> it = beanDefinitions.entrySet().iterator(); while (it.hasNext()) { Entry<String, BeanDefinition> entry = it.next(); if (context.containsBean(entry.getKey())) { throw new IllegalArgumentException("Spring context already has a bean named " + entry.getKey()); } beanDefinitionRegistry.registerBeanDefinition(entry.getKey(), entry.getValue()); log.info("register OrcRpcConsumerBean definition: {}", entry.getKey()); }} private void parseField(Field Field) {OrcRpcConsumer OrcRpcConsumer = field.getAnnotation(OrcRpcConsumer.class); if (orcRpcConsumer ! = null) {/ / use the type of the field and OrcRpcConsumer comment together generate BeanDefinition OrcRpcConsumerBeanDefinitionBuilder beanDefinitionBuilder = new OrcRpcConsumerBeanDefinitionBuilder(field.getType(), orcRpcConsumer); BeanDefinition beanDefinition = beanDefinitionBuilder.build(); beanDefinitions.put(field.getName(), beanDefinition); }}Copy the code

The main implementation of ProxyFactory:

public class JdkProxyFactory implements ProxyFactory{ @Override public Object getProxy(ServiceMetadata serviceMetadata) { return Proxy .newProxyInstance(serviceMetadata.getClazz().getClassLoader(), new Class[] {serviceMetadata.getClazz()}, new ClientInvocationHandler(serviceMetadata)); } private class ClientInvocationHandler implements InvocationHandler { private ServiceMetadata serviceMetadata; public ClientInvocationHandler(ServiceMetadata serviceMetadata) { this.serviceMetadata = serviceMetadata; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String serviceId = ServiceUtils.getServiceId(serviceMetadata); / / by the load balancer to select a provider address ServiceURL service = InvocationServiceSelector. Select (serviceMetadata); OrcRpcRequest request = new OrcRpcRequest(); request.setMethod(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); request.setRequestId(UUID.randomUUID().toString()); request.setServiceId(serviceId); OrcRpcResponse response = InvocationClientContainer.getInvocationClient(service.getServerNet()).invoke(request, service); if (response.getStatus() == RpcStatusEnum.SUCCESS) { return response.getData(); } else if (response.getException() ! = null) { throw new OrcRpcException(response.getException().getMessage()); } else { throw new OrcRpcException(response.getStatus().name()); }}}}Copy the code

This implementation only uses the JDK dynamic proxy, you can also use cglib or Javassist implementation for better performance, JdkProxyFactory.

4 I/o module

The UML diagram is as follows:

The structure is clear, and it is divided into three modules: client call adaptation module, server request response adaptation module and Netty IO service module.

The client invokes the adaptation module

This module is relatively simple, mainly for the client to call the establishment of the service end, and the connection is stored in the cache, to avoid the subsequent repeated establishment of a connection with the service call, after the successful establishment of the connection to initiate the call. Here is an implementation of DefaultInvocationClient:

Server request response adaptation module

The service request response module is also relatively simple. It obtains the service metadata from the cache according to the service name in the request, and then obtains the method and parameter type information called from the request, and reflects the method information called. The bean is then retrieved from the Spring Context for the reflection call.

Netty IO service module

The Netty IO service module is the core and slightly complicated. The client and server processes are as follows:

Among them, the focus is on the implementation of these four classes: NettyNetClient, NettyNetServer, NettyClientChannelRequestHandler and NettyServerChannelRequestHandler, The UML diagram above and the flow diagram below basically explain their relationship and the processing flow of a request, which will not be expanded here.

Let’s focus on codecs.

In the section of technology selection, the adopted communication protocol is mentioned and the private RPC protocol is defined:

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-- ------+--------+--------+ | BYTE | | | | | | | ........ +--------------------------------------------+--------+-----------------+--------+--------+--------+--------+--------+-- ------+-----------------+ | magic | version| type | content lenth | content byte[] | | +--------+-----------------------------------------------------------------------------------------+-------------------- ------------------------+Copy the code
  • The first byte is the magic number defined as 0X35.
  • The second byte represents the protocol version number.
  • The third byte is the request type, where 0 represents the request and 1 represents the response.
  • The fourth byte represents the length of the message, that is, the length following the four bytes is the message content.

The implementation of encoder is as follows:

@Override protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMsg protocolMsg, ByteBuf ByteBuf) throws Exception {// Write protocol header bytebuf.writeByte (protocolconstant.magic); // Write version bytebuf.writeByte (protocolconstant.default_version); // Write request type bytebuf.writeByte (protocolmsg.getMsgType ()); // Write message length bytebuf.writeInt (protocolmsg.getContent ().length); // Write message contents bytebuf.writeBytes (protocolMsg. GetContent ()); }Copy the code

The implementation of the decoder is as follows:

Six test

In my MacBook Pro 13 inch, 4 core I5, 16G memory, using Nacos registry, starting one server, one client, using the case of polling load balancing strategy, using Apache AB test.

With 10000 requests from 8 threads enabled, all requests can be completed in 18 seconds, qPS550:

With 10000 requests from 100 threads enabled, all requests can be completed in 13.8 seconds, qPS724:

Seven summarizes

In the process of implementing the RPC framework, I also relearned a lot of knowledge, such as communication protocol, IO framework and so on. Also lateral study of the current hottest gRPC, and to see a lot of relevant source code, a great harvest. In the future, I will continue to maintain and upgrade this framework, such as introducing circuit breaker downgrade mechanism, so as to make continuous learning and progress.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.