Hands-on implementation of RPC

Recently, I realized a simple RPC demo by using my time. In this process, I encountered some problems and also gained some things. I would like to share this process.

preparation

RPC is common and familiar in enterprise development, but for developing an RPC, we need to master some basic theoretical knowledge.

  • RPCNetwork communication (communication between processes) is slower than single-process communication
  • What the network transmits isbyteRather thancharacterOr some other transmission medium.
  • Understand or graspThe use of dynamic proxies.

The essence is to transmit information, I (the consumer) know the information to tell you (the server) through communication, and then you give me back the final information.

implement

With that in mind, let’s take a look at how we use RPC. The service provider needs to expose the service, and the service consumer needs to access the required service. This process is shown below, and my talk will revolve around it.

Server exposes service –> consumer references service –> dynamic proxy –> serialization –> network request –> server processing –> serialization return –> Consumer Returns result.

Services available

Service exposure is to publish the service to (ZooKeeper) and make it available to other consumers. When consumers get this information, they can connect to the server and initiate a request. Exposure is the need to publish the basic information of the service. The main information listed here includes service machine host service listening port service interface service serialization method service weight service version and other codes as follows

1. Information exposed by the service

	public class Provider implements Serializable.Cloneable {
   		private String serviceName;
  	  	private String host;
 	   	private Integer port;
  	  	private String version;
  	  	private Integer weight;
 	   	private String serialization;
    
  	  	get and set
	}
Copy the code

2. The process of exposing services, taking ZooKeeper as an example, is to add services to ZK in the form of nodes

	@Override
    public void registerService(List<Provider> providerList) {
        assertzkClient ! =null;
        providerList.parallelStream().forEach(provider -> {
            String host = provider.getHost();
            Integer port = provider.getPort();
            String serviceName = provider.getServiceName();
            String version = provider.getVersion();
            Integer weight = provider.getWeight();
            String serverPath = root_path + "/" + serviceName + root_provider;
            if(! zkClient.exists(serverPath)) { zkClient.createPersistent(serverPath,true);
            }
            String finalInfo = host + split + port + split + serviceName + split + version + split + weight + split + provider.getSerialization();
            String path = serverPath + "/" + finalInfo;
            if(! zkClient.exists(path)) { log.info("Register service :{} to ZooKeeper", serverPath);
                zkClient.createEphemeral(path);
            } else {
                log.warn("Service :{} has been registered", serverPath); }}); }Copy the code

You can then run the zkCli command to check the status of services on the ZK.

[zk: localhost:2181(CONNECTED) 0] ls /fuck/top.huzhurong.fuck.UserService/provider
[]
Copy the code

The consumer side references services and dynamic proxies

The consumer-to-server approach is mostly introduced through Spring’s custom tags. The usage is as follows.

	<fuck:reference id="test" interface="top.huzhurong.fuck.UserService" version="0.0.1"/>
Copy the code

Thus references to the top. Huzhurong. Fuck. UserService this version 0.0.1 service. When using Spring, we need to use dynamic proxies to access the network and invoke interfaces on the service, so it is not possible to configure beans like ordinary beans. Generally, factoryBeans are configured. Because beans can be customized in getObject (most frameworks are also introduced by FactoryBeans).

Factorybeans can be used in the following ways

public class ProxyBean implements FactoryBean.InitializingBean {
    private Class name;
    private Object object;
    @Override
    public Object getObject(a) {
        return object;
    }
    @Override
    publicClass<? > getObjectType() {return name;
    }
    @Override
    public void afterPropertiesSet(a) {
        this.build();
    }
    private void build(a) {
    	  // Dynamic proxy generates a proxy bean
        this.object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{this.name}, (proxy, method, args) -> {
        	 // Customize our service in Invoke, go TCP/HTTP, etc
            if (method.getName().equalsIgnoreCase("name")) {
                return "Call the name method";
            }
            if (method.getName().equalsIgnoreCase("toString")) {
                return "Call toString method";
            }
            return "111"; }); }}Copy the code

This is just a simple procedure to illustrate the service reference, as follows

  • Register the consumer node with ZooKeeper
  • Gets a list of servers
  • Subscribe to the service interface
  • Setting up dynamic Proxy

I don’t have enough space, so I’m going to cover setting up dynamic agents. The other three are pretty simple.

  Object object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{Class.forName(this.interfaceName)}
                , new FuckRpcInvocationHandler(this));
                
 class FuckRpcInvocationHandler implements InvocationHandler {
        / / field
        // constructor
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        	// Get the list of services
            List<Provider> all = ProviderSet.getAll(this.className);
            // Soft load balancing
            Provider provider = loadBalance.getProvider(all);
            String info = provider.buildIfno();
            SocketChannel channel = ChannelMap.get(info);
            if (channel == null) {
            	// Establish a TCP connection
                Client client = new NettyClient(provider, this.serialization);
                client.connect(provider.getHost(), provider.getPort());
                channel = ChannelMap.get(info);
            }

            SocketChannel finalChannel = channel;
            Future<Response> submit = TempResultSet.executorService.submit(() -> {
                // Request to write to the TCP channel
                finalChannel.writeAndFlush(request);
                // This latch can be CountDownLatch. Circulation is better
                for(; ;) { Response response = TempResultSet.get(request.getRequestId());if(response ! =null) {
                        returnresponse; }}}); Response response = submit.get(this.timeout, TimeUnit.SECONDS); }}Copy the code

Dynamic proxy is mainly to obtain the list of services, software load handling, establish TCP connections, communication steps.

The dynamic proxy only enters the Invoke method when it is actually called, not instantiated, and every proxy call goes to Invoke

Serialization and network requests

There are a lot of articles on serialization, I only support Protostuff and JDK serialization. It is very difficult itself, is still OK after encapsulation, focus on the processing of network transmission

After solving the serialization, is the network transmission, we all know that the network transmission is byte, but how to use the network transparent transmission we are very headache, and Netty in use to reduce our entry difficulty, its simple API can be quickly used, if you have not passed, See Netty’s example to get started.

In the processing of network transmission part, one is the processing of protocol, and the other is how to assign the result of the processed request to the correct request object.

1, the protocol processing is also very simple, is a regular length + data, as a custom protocol, this processing can let us fast processing without entangling the correctness of the protocol

  	 @Override
       protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
        if (byteBuf.readableBytes() <= HEAD_LENGTH) {
            return;
        }
        byteBuf.markReaderIndex();// Mark the location
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();// There is not enough data to read
            return;
        }
        byte[] dataArray = new byte[dataLength];
        byteBuf.readBytes(dataArray);
        Request request = serialization.deSerialize(dataArray, Request.class);
        if (log.isDebugEnabled()) {
            log.debug("Received consumer request :{}, request content :{}", ctx.channel().toString(), request);
        }
        list.add(request);
    }
Copy the code

2, how to return the result of the processing to the correct request object, we all know that when executing a request, we need an RI –> requestId to mark the request, even in our RPC, to ensure that the request and response is a family

	public class Request implements Serializable {
    		private String requestId;// Request identification
    		private String serviceName;// Service name
    		private String methodName;// Method name, Method cannot be serialized
    		privateClass<? >[] parameters;// Parameter type, used to get the execution method of the
    		private Object[] args;// Actual parameters
	}
Copy the code

3, service processing, you can choose to create a new thread pool rather than directly using Netty’s Work IO thread pool.

	@Override
   	 protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
     		   if (serializable instanceof Request) {
     	       Request request = (Request) serializable;
            responseTask.execute(new ResponseTask(request, channelHandlerContext, this.applicationContext)); }}@Override
    public void run(a) { String serviceName = request.getServiceName(); String methodName = request.getMethodName(); Class<? >[] parameters = request.getParameters(); Object[] args = request.getArgs(); Response response =new Response();
        response.setRequestId(request.getRequestId());
        response.setSuccess(false);
        try {
        	  // Get the service
            Object service = ServiceCache.getService(serviceName);
            if (service == null) { Class<? > aClass = ClassUtils.forName(serviceName, ClassUtils.getDefaultClassLoader()); service = applicationContext.getBean(aClass); ServiceCache.put(serviceName, service); }// Get method
            Method method = service.getClass().getDeclaredMethod(methodName, parameters);
            Object invoke = method.invoke(service, args);
            System.out.println("invoke:" + invoke);
            response.setSuccess(true);
            response.setObject(invoke);
        } catch (ClassNotFoundException | IllegalAccessException e) {
            response.setException(e);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
            response.setException(e);
        } catch (InvocationTargetException e) {
            e.printStackTrace();
            response.setException(e.getTargetException());
        }
        // Write to channel
        channelHandlerContext.writeAndFlush(response);
    }
Copy the code

The consumer returns the result

When the server writes data, if the network is open, it basically goes to the client, and we simply put it into a Map, and the consuming thread keeps looking for the data in the Map

 @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
        if (serializable instanceof Response) {
            Response response = (Response) serializable;
            / / write a mapTempResultSet.put(response.getRequestId(), response); }}// Loop write
    for(; ;) { Response response = TempResultSet.get(request.getRequestId());if(response ! =null) {
              returnresponse; }}Copy the code

At this point a simple RPC call is ready to go.

summary

A simple RPC is fresh, but in fact there are many points that can be modified, such as interception (chain of responsibility +SPI), such as the use of observer mode, but also learned a basic RPC use method, but also need to understand the use of thread pool, the use of here is also very crude. But I was happy to write it in a few days, and the project is here —-> simple implementation of fuck- RPC, you can also try it out

2018-12-05