Remote Procedure Call (RPC) is a Remote Procedure Call that is not aware to the caller. At present, popular open source RPC frameworks include Dubbo of Ali, gRPC of Google, Finagle of Twitter, etc. The design of RPC framework mainly refers to Ali’s Dubbo, where Netty basically exists as the technical base of the architecture, mainly completing high-performance network communication, so as to achieve efficient remote call.

Dubbo’s architecture versus Spring

In fact, the composition and structure of Dubbo were discussed in the previous article “Talk about JINGdong’s Service Framework”.

node instructions
Provider The service provider that exposes the service
Consumer Service consumer that invokes the remote service
Registry A registry for service registration and discovery
Monitor A monitoring center that collects statistics on service invocation times and invocation time
Container Service run container

In addition, the most convenient place to use Dubbo is that it can be easily integrated with Spring. The optimization of Dubbo configuration is also in line with Spring. From the earliest XML form to the later annotation method and automatic assembly, Dubbo is constantly simplifying the development process to improve the development efficiency.

Dubbo’s workflow in the Spring framework:

Start Spring’s IOC container. 2. Register services in the registry (ZooKeeper software). 3. The registry will immediately notify the consumer 5. The provider can be called directly according to the service address in the registry, if the provider is called, the provider’s address will be actively cached 6. Monitor the number of times the consumer calls the provider

The key to RPC implementation

1. Serialization and deserialization

In remote procedure calls, the client and the server are different processes, and sometimes the client uses Java and the server uses C++. This process is called serialization and deserialization. In the same way, the value returned from the server also needs to be serialized and deserialized. For serialization, we choose Netty’s own object serializer.

2. Data network transmission

Solved the problem of the serialization, what remains is how to put the data parameters to the producers, the network layer of the parameters of the need to turn the serialized byte spread to the server, and then put the serialized calls back to the client as a result, although most of the RPC framework USES TCP as transport protocol, UDP actually can also be used as a transport protocol, With TCP and UDP being able to customize arbitrary protocols, and using NIO as a high performance network service, Netty seemed to appeal to our Java programmers.

Tell the registry who I want to transfer

Now call parameter serialization and network transmission are already have, but there is a problem, that is the problem, consumers who want to call a function or method, we can understand as a service, these services are registered in the registry, only when the consumer who told the registry to call, just can makes a remote call. So not only do you pass in the parameters of the service to be invoked, but you also pass in the information about the service to be invoked.

Architecture of a simple RPC framework

Dubbo core modules mainly consist of four parts: Registry Registry, Provider service Provider, Consumer service Consumer, Monitor monitoring, for the convenience of directly cut off the monitoring module, while the service Provider module and Registry module written together, through the implementation of their own simple IOC container, to complete the instantiation of the service Provider.

For more information on Socket programming with Netty, see the Netty website or my previous blog, Netty Coding practices and Channel Life Cycle. Here, Netty coding techniques and methods are not the focus of this article.

RPC framework coding implementation

The first dependencies that need to be introduced are as follows (Netty + Lombok) :

<dependency> <groupId> io.ty </groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> < the dependency > < groupId > org. Projectlombok < / groupId > < artifactId > lombok < / artifactId > < version > 1.16.8 < / version > </dependency>Copy the code

1. Registry and Provider

The directory structure is as follows:

─ ─ ─ the SRC └ ─ the main ├ ─ Java │ └ ─ edu │ └ ─ xpu │ └ ─ RPC │ ├ ─ API │ │ IRpcCalc. Java │ │ IRpcHello. Java │ │ │ ├ ─ core │ │ InvokerMessage. Java │ │ │ ├ ─ the provider │ │ RpcCalcProvider. Java │ │ RpcHelloProvider. Java │ │ │ └ ─ registry │ ├ ─ imp resources ─ pam.xml myregistryHandle.java │ RpcRegistryCopy the code

Irpccalc. Java and irpchello. Java are two Service interfaces. Irpccalc. Java contains the following contents and performs the addition, subtraction, multiplication, and division operations of simulated services

Public interface IRpcCalc {// add int a, int b; Int sub(int a, int b); Int mul(int a, int b); Int div(int a, int b); }Copy the code

Irpcello.java to test whether the service is available:

public interface IRpcHello {
    String hello(String name);
}
Copy the code

At this point the API module is defined, very simple two interfaces. Next, we need to determine the transport rules, also known as transport protocols, and of course the protocol content needs to be customized to reflect Netty’s advantages.

Create an InvokerMessage class that contains the service name, calling method, parameter list, and parameter values. This is the protocol package for our custom protocol:

@Data public class InvokerMessage implements Serializable { private String className; // Service name private String methodName; Which method is called private Class<? >[] params; // Parameter list private Object[] values; // Parameter value}Copy the code

By defining such a protocol class, we can know which service we need to call, which method in the service, and the list of arguments (parameter type + parameter value) that the method needs to pass. This information is passed correctly to get the correct call return value.

Create a concrete implementation class for these two services. IRpcHello’s implementation class is as follows:

public class RpcHelloProvider implements IRpcHello {
    public String hello(String name) {
        return "Hello, " + name + "!";
    }
}
Copy the code

The IRpcCalc implementation class is as follows:

public class RpcCalcProvider implements IRpcCalc { @Override public int add(int a, int b) { return a + b; } @Override public int sub(int a, int b) { return a - b; } @Override public int mul(int a, int b) { return a * b; } @Override public int div(int a, int b) { return a / b; }}Copy the code

The main function of The Registry is to register the service name and service reference address of all providers into a container (for the convenience of directly using the interface class name as the service name, assuming that we only have one implementation class for each service) and publish them to the public. Registry should start an external service, obviously as a server, and provide a port that can be accessed externally. Create RpcRegistry class RpcRegistry. Java

public class RpcRegistry { private final int port; public RpcRegistry(int port){ this.port = port; } public void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void  initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); / / processing, unpacking, stick package codec pipeline. AddLast (new LengthFieldBasedFrameDecoder (Integer. MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); // Handle serialized codecs pipeline.addlast ("encoder", new ObjectEncoder()); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); // own business logic pipeline.addlast (new MyRegistryHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture = serverbootstrap.bind (this.port).sync(); ChannelFuture = serverbootstrap.bind (this.port).sync(); System.out.println("RPC Registry start listen at " + this.port); channelFuture.channel().closeFuture().sync(); } catch (Exception e){ e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { new RpcRegistry(8080).start(); }}Copy the code

Now we just need to implement our own Handler and create myRegistryHandler.java with the following content:

Public class MyRegistryHandler extends ChannelInboundHandlerAdapter {/ / in the registry services need to have a container to store the public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>(); Private static final List<String> classCache = new ArrayList<>(); / / agreement, // edu.xpu.rpc. Provider public MyRegistryHandler(){// edu.xpu.rpc. Provider public MyRegistryHandler(){ scanClass("edu.xpu.rpc.provider"); doRegister(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Object result = new Object(); InvokerMessage request = (InvokerMessage) MSG; String serverClassName = request.getClassName(); If (registrymap. containsKey(serverClassName)){clazz = registrymap. get(serverClassName); Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams()); result = method.invoke(clazz, request.getValues()); System.out.println("request=" + request); System.out.println("result=" + result); } ctx.writeAndFlush(result); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); Private void scanClass(String packageName){ClassLoader ClassLoader = this.getClass().getClassLoader(); URL url = classLoader.getResource(packageName.replaceAll("\\.", "/")); File dir = new File(url.getFile()); File[] files = dir.listFiles(); for (File file: files){ if(file.isDirectory()){ scanClass(packageName + "." + file.getName()); }else{String className = packageName + "." + file.getName().replace(".class", "").trim(); classCache.add(className); Private void doRegister(){if(classCache. Size () == 0) return; if(classCache. for (String className: classCache){ try { Class<? > clazz = Class.forName(className); // Service name Class<? > anInterface = clazz.getInterfaces()[0]; registryMap.put(anInterface.getName(), clazz.newInstance()); } catch (Exception e) { e.printStackTrace(); }}}}Copy the code

In this paper, a simple IOC container is realized by reflection. First, recursively scan the classes under the provider package, and put the objects of these classes into the IOC container for management as service objects. Since IOC is a Map implementation, the class name is used as the service name, namely Key, and the service object is used as Value. Based on the service name sent by the consumer, the corresponding service can be found. At this point, Registry and Provider have been written.

2, consumer

The directory structure is as follows:

└ ─ SRC ├ ─ the main │ ├ ─ Java │ │ └ ─ edu │ │ └ ─ xpu │ │ └ ─ RPC │ │ ├ ─ API │ │ │ IRpcCalc. Java │ │ │ IRpcHello. Java │ │ │ │ │ ├ ─ consumer │ │ │ │ RpcConsumer. Java │ │ │ │ │ │ │ └ ─ proxy │ │ │ RpcProxy. Java │ │ │ RpcProxyHandler. Java │ │ │ │ │ ├ ─ ├ ─ garbage, ├ ─ garbage, ├ ─ garbageCopy the code

Before looking at the client implementation, let’s take a look at the RPC process. Interfaces in API modules are implemented only on the server side. Therefore, when a client calls an interface method defined in the API, it is actually making a network request to invoke a service on the server side. This network request is first received by the registry, which determines the location of the service to be invoked, forwards the request to the real service implementation, and finally invokes the server code to transmit the return value to the client over the network. The whole process is as insensitive to the client as it is to a local method, so you must proxy the client API to hide the details of the network request.

As can be seen from the flow chart above, in order for the user to call without awareness, a proxy class must be created to complete the operation of the network request.

RpcProxy. Java is as follows:

public class RpcProxy { public static <T> T create(Class<? > clazz) {//clazz is a new MethodProxy; T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy); return result; } private static class MethodProxy implements InvocationHandler { private Class<? > clazz; public MethodProxy(Class<? > clazz) { this.clazz = clazz; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// If (Object.class.equals(method.getDeclaringClass())) {try {return  method.invoke(this, args); } catch (Throwable t) { t.printStackTrace(); Return rpcInvoke(method, args);} else {return rpcInvoke(method, args); } return null; Public Object rpcInvoke(Method Method, Object[] args) {InvokerMessage InvokerMessage = new InvokerMessage(); invokerMessage.setClassName(this.clazz.getName()); invokerMessage.setMethodName(method.getName()); invokerMessage.setValues(args); invokerMessage.setParams(method.getParameterTypes()); final RpcProxyHandler consumerHandler = new RpcProxyHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); AddLast ("frameEncoder", new LengthFieldPrepender(4)); // Object parameter type pipeline. AddLast ("encoder", new ObjectEncoder()); / / object parameter types decoder pipeline. AddLast (" decoder, "new ObjectDecoder (Integer. MAX_VALUE, ClassResolvers cacheDisabled (null))); pipeline.addLast("handler", consumerHandler); }}); ChannelFuture future = bootstrap.connect("localhost", 8080).sync(); future.channel().writeAndFlush(invokerMessage).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } return consumerHandler.getResponse(); }}}Copy the code

By incoming interface object, we acquired to invoke the service, the service method name, parameter type list, parameter list, thus the custom of RPC protocol packaging good, just need to package sent out waiting for the results back to the agreement, so in order to receive a return value data also need to customize a receive Handler, RpcProxyHandlerdiamante is as follows:

public class RpcProxyHandler extends ChannelInboundHandlerAdapter { private Object result; public Object getResponse() { return result; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception is general"); }}Copy the code

Now that the process is complete, let’s test the rpcconsumer.java code as follows:

Public class RpcConsumer {public static void main(String[] args) {IRpcHello IRpcHello = new RpcHelloProvider(); // iRpcHello.hello("Tom"); IRpcHello rpcHello = rpcProxy.create (irpchilel.class); System.out.println(rpcHello.hello("ZouChangLin")); int a = 10; int b = 5; IRpcCalc iRpcCalc = RpcProxy.create(IRpcCalc.class); System.out.println(String.format("%d + %d = %d", a, b, iRpcCalc.add(a, b))); System.out.println(String.format("%d - %d = %d ", a, b, iRpcCalc.sub(a, b))); System.out.println(String.format("%d * %d = %d", a, b, iRpcCalc.mul(a, b))); System.out.println(String.format("%d / %d = %d", a, b, iRpcCalc.div(a, b))); }}Copy the code

3. Effect test

Start Registry first, run port 8080:

Start the consumer call

After the call is complete, you can see that the call result is correct, and you can also see the log in Registry:

As you can see, the simple RPC framework is complete!