In the Internet era, all kinds of RPC frameworks prevail. If you look closely at all kinds of frameworks, there are various changes in application level. However, the most core part of RPC is basically the same.


That is, it communicates with the remote server and invokes the remote service as if it were a local service.


Then on top of that you might add features like automatic service registration and discovery, load balancing, nearby routing, call link logging, remote mock, and so on.


Today, I want to share with you that if you don’t consider performance, API ease of use, service discovery, load balancing, environment isolation and other factors, you can actually build a basic RPC framework in a few minutes.


Personally, I think the following are the cornerstones of RPC: 1. Serialization protocol 2. Remote communication message protocol 3.


Serialization protocol JDK provides a set of native serialization protocols for us, although it is not cross-language and the compression rate is not high, but we only show RPC technology, not other things. If you feel bad, you can use Hessian, Protobuf, or even open source frameworks such as FastJSON serialization.


Remote communication JDK has a socket and I/O stream, although it is synchronous, performance is not good, but just to show the principle of RPC technology, not considering other, for performance and throughput we can choose Netty for transformation.


Communication protocol, I only made a simple MAGIC_NUM+ two byte packet length + serialized object byte stream protocol, protocol can add a lot of things, such as version number, heartbeat packet, status code, extensible packet, etc., but again, here just to show the principle of RPC, not consider the other.


Protocol message handling part through the style just inside the class name of the carrying method name, method arguments, do a simple reflection processing, this part can extend the part a lot, actually do the method cache in advance, for example, the method signature using the short name registration, etc., or you want to faster can also automatically generated by means of byte code into some template code, Turn reflection into a direct method call.





So let’s go straight to the code


The first is that the objects transferred are all Java serializable objects:


public class RpcCommand implements Serializable{ String className ; String methodName ; String[] argumetsType ; Object[] params ; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public String[] getArgumetsType() { return argumetsType; } public void setArgumetsType(String[] argumetsType) { this.argumetsType = argumetsType; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } } public class RpcResponse implements Serializable{ boolean isException; Object result ; Exception exception; public boolean isException() { return isException; } public void setException(boolean exception) { isException = exception; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public Exception getException() { return exception; } public void setException(Exception exception) { this.exception = exception; }}Copy the code

 

The second is the processing part of the request object


package com.shock.rpc; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * ${DESCRIPTION} * com.shock.rpc.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class RpcHandler { ConcurrentHashMap<String, Object> registered = new ConcurrentHashMap<String, Object>(128); public RpcResponse handler(RpcCommand commond) { String className = commond.getClassName(); RpcResponse response = new RpcResponse(); try { Object obj = registered.get(className); String[] argTypes = commond.getArgumetsType(); Class aClass = Class.forName(className); List<Class> argsTypeList = new ArrayList<Class>(argTypes.length); for (String s : argTypes) { argsTypeList.add(Class.forName(s)); } Method method = aClass.getMethod(commond.getMethodName(), argsTypeList.toArray(new Class[argsTypeList.size()])); Object object = method.invoke(obj, commond.getParams()); response.setResult(object); } catch (Exception e) { e.printStackTrace(); response.setException(true); response.setException(e); } return response; } public void regist(Class interfa, Object object) { registered.put(interfa.getName(), object); }}Copy the code

There is only a very rough reflection implementation in the code and the third one is the server startup and server protocol handling code

package com.shock.rpc; import com.shock.rpc.demo.IDemoImpl; import com.shock.rpc.demo.IDemoInterface; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * ${DESCRIPTION} * com.shock.rpc.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class RpcServer { int port; public RpcServer(int port, RpcHandler handler) { this.port = port; this.handler = handler; } RpcHandler handler; ExecutorService executorService = Executors.newFixedThreadPool(20); public void start() { try { ServerSocket serverSocket = new ServerSocket(port); while (true) { Socket socket = serverSocket.accept(); executorService.submit(new WorkThread(socket)); } } catch (IOException e) { e.printStackTrace(); } } public class WorkThread implements Runnable { Socket socket; WorkThread(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream inputStream = socket.getInputStream(); OutputStream outputStream = socket.getOutputStream(); while (true) { int magic = inputStream.read(); Int length1 = inputstream.read (); length1 = inputstream.read (); int length2 = inputStream.read(); int length = (length1 << 8) + length2; ByteArrayOutputStream bout = new ByteArrayOutputStream(length); int sum = 0; byte[] bs = new byte[length]; while (true) { int readLength = inputStream.read(bs, 0, length - sum); if (readLength > 0) { bout.write(bs, 0, readLength); sum += readLength; } if (sum >= length) { break; } } ObjectInputStream objectInputStream = new ObjectInputStream( new ByteArrayInputStream(bout.toByteArray())); try { RpcCommand commond = (RpcCommand) objectInputStream.readObject(); RpcResponse response = handler.handler(commond); ByteArrayOutputStream objectout = new ByteArrayOutputStream(length); ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout); objectOutputStream.writeObject(response); objectOutputStream.flush(); byte[] commondBytes = objectout.toByteArray(); int len = commondBytes.length; outputStream.write(0x5A); outputStream.write(len >> 8); outputStream.write(len & 0x00FF); outputStream.write(commondBytes); outputStream.flush(); } catch (Exception e) { e.printStackTrace(); } } } } catch (IOException e) { e.printStackTrace(); System.out.println(" The client is disconnected "); } finally { if (socket ! = null) { try { socket.close(); } catch (Exception e) { e.printStackTrace(); } } } } } public static void main(String[] args) { RpcHandler rpcHandler = new RpcHandler(); rpcHandler.regist(IDemoInterface.class, new IDemoImpl()); RpcServer servcer = new RpcServer(8081, rpcHandler); servcer.start(); }}Copy the code

Code implementation is also very simple, that is, according to the previous transmission message protocol read the transmission message, deserialize the request object RpcCommand, to process the class, if the compatibility plus versions and different protocols, you can add different processing implementation.

Finally, the client-side transport and protocol handling code


package com.shock.rpc; import com.shock.rpc.demo.IDemoInterface; import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; /** * ${DESCRIPTION} * com.shock.rpc.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class RpcClient { String host; int port; Socket socket; InputStream inputStream; OutputStream outputStream; public RpcClient(String host, int port) { try { socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); } catch (IOException e) { e.printStackTrace(); }} // This can not be concurrent requests, Public synchronized RpcResponse invoke(RpcCommand commond) {RpcResponse response = new RpcResponse(); try { ByteArrayOutputStream objectout = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout); objectOutputStream.writeObject(commond); objectOutputStream.flush(); byte[] commondBytes = objectout.toByteArray(); outputStream.write(0x5A); int len = commondBytes.length; outputStream.write(len >> 8); outputStream.write(0x00FF & len); outputStream.write(commondBytes); outputStream.flush(); while (true) { int magic = inputStream.read(); if (magic == 0x5A) { int length1 = inputStream.read(); int length2 = inputStream.read(); int length = (length1 << 8) + length2; ByteArrayOutputStream bout = new ByteArrayOutputStream(length); int sum = 0; byte[] bs = new byte[length]; while (true) { int readLength = inputStream.read(bs, 0, length - sum); if (readLength > 0) { bout.write(bs, 0, readLength); sum += readLength; } if (sum >= length) { break; } } ObjectInputStream objectInputStream = new ObjectInputStream( new ByteArrayInputStream(bout.toByteArray())); RpcResponse response1 = (RpcResponse) objectInputStream.readObject(); return response1; } } } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return response; } public static void main(String[] args) { RpcClient client = new RpcClient("localhost", 8081); RpcCommand command = new RpcCommand(); command.setClassName(IDemoInterface.class.getName()); command.setMethodName("noArgument"); command.setArgumetsType(new String[0]); RpcResponse response = client.invoke(command); RpcCommand command2 = new RpcCommand(); command2.setClassName(IDemoInterface.class.getName()); command2.setMethodName("withReturn"); command2.setArgumetsType(new String[] { "java.lang.String" }); command2.setParams(new String[] { "shocklee" }); RpcResponse response2 = client.invoke(command2); System.out.println(response.getResult()); System.out.println(response2.getResult()); }}Copy the code

 

At this point, the framework is complete, and there is no common RPC client API wrappers, such as fetching a remote object from a container based on the interface and calling the remote object’s methods directly.


Finally, post a test class and interface


package com.shock.rpc.demo; /** * ${DESCRIPTION} * com.shock.rpc.demo.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public interface IDemoInterface { public String withReturn(String name); public void noReturn(String name); public String noArgument(); } package com.shock.rpc.demo; /** * ${DESCRIPTION} * com.shock.rpc.demo.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class IDemoImpl implements IDemoInterface { @Override public String withReturn(String name) { System.out.println("withReturn "+name); return "hello " + name; } @Override public void noReturn(String name) { System.out.println("noReturn "+name); } @Override public String noArgument() { System.out.println("noArgument"); return "noArgument"; }}Copy the code

 

The whole RPC function has been posted, the code has not been cleaned up, the serialization/antisequence code has not been abstracted, the protocol part has not been abstracted, just want to write fast, can be written in a short time to correspond to the title ten minutes, so the code may be a little ugly, but the whole is shown, The critical code does not require the use of any third party frameworks and toolkits.





Welcome to clap bricks.





In addition, I wrote a slightly more complicated RPC on Github. If any students want to write and play together, please make an appointment immediately. The code address is


https://github.com/shocklee6315/simpleRpcServer