Smart – socket implementation of RPC

RPC is currently widely used in Internet services as a technology, about its basic introduction you can learn from Baidu, not to repeat here. As the saying goes, it is better to travel thousands of miles than to read thousands of books. No matter how many original articles you read, it is better to realize RPC in person so that you can have a more thorough understanding of RPC. This paper will demonstrate the working principle and implementation scheme of RPC from a purely technical perspective.

Before we begin, let’s list the technical points we need to use to implement RPC:

  1. communication
  2. Serialize/deserialize
  3. reflection
  4. A dynamic proxy

In the specific implementation, except for the communication part, we choose smart-socket to assist, and the rest includes serialization/deserialization, reflection, dynamic proxy and other parts. We will adopt the solution provided by JDK. After you master RPC, you can try to reconstruct RPC with third-party technology.

Noun explanation

  • Provider Indicates the RPC service Provider
  • Consumer RPC service caller

Message communication

Since RPC is a cross-network communication service, we first formulate communication rules, which involve communication and serialization/deserialization technologies.

Communication protocol

We use the simplest length+data mode for communication protocol, and the implementation algorithm of codec is as follows. As an example, we assume that the readBuffer is large enough to hold a complete message, that the data part of the protocol is the serialized byte array of the RPC service, and that Provider/Consumer must deserialize the byte array before proceeding with the RPC service.

public class RpcProtocol implements Protocol<byte[]> { private static final int INTEGER_BYTES = Integer.SIZE / Byte.SIZE; @Override public byte[] decode(ByteBuffer readBuffer, AioSession<byte[]> session, boolean eof) { int remaining = readBuffer.remaining(); if (remaining < INTEGER_BYTES) { return null; } int messageSize = readBuffer.getInt(readBuffer.position()); if (messageSize > remaining) { return null; } byte[] data = new byte[messageSize - INTEGER_BYTES]; readBuffer.getInt(); readBuffer.get(data); return data; } @Override public ByteBuffer encode(byte[] msg, AioSession<byte[]> session) { ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length + INTEGER_BYTES); byteBuffer.putInt(byteBuffer.capacity()); byteBuffer.put(msg); byteBuffer.flip(); return byteBuffer; }}Copy the code

RPC request message

The RPC request message is sent by the Consumer, and the Consumer needs to provide enough information in the request message for the Provider to accurately identify the service interface. The core elements include:

  1. Uuid A unique identifier of a request message, which is used to associate and identify the response message.
  2. The name of the API to be called by the interfaceClass Consumer
  3. Method Consumer Specifies the NAME of the API interface method to execute
  4. ParamClassList Input parameter type of the method called by Consumer. This parameter is used to distinguish between different input parameters of the same method name
  5. Params Consumer executes the parameter values passed in by the method
Public class RpcRequest implements Serializable {/** * private final String uUID = UUID.randomUUID().toString(); /** * interface name */ private String interfaceClass; /** * private String method; /** * Parameter Type String */ private String[] paramClassList; /** * private Object[] params; getX/setX() }Copy the code

RPC response message

The RPC response message is the carrier in which the Provider sends the interface execution result back to the Consumer.

  • The UUID is the same value as the RPC request message
  • ReturnObject The RPC interface executes a return value
  • ReturnType Return value type of the RPC interface
  • Exception RPC executes an exception message, if an exception occurs.
Public class RpcResponse implements Serializable {/** * a unique identifier for a message, the same as the corresponding RpcRequest UUID value */ private String UUID; /** * returnObject */ private Object returnObject; /** * Return object type */ private String returnType; /** * private String exception; public RpcResponse(String uuid) { this.uuid = uuid; } getX/setX() }Copy the code

The above content will complete the message design of RPC communication. As for how to convert RpcRequest and RpcResponse into byte array format required by communication protocol, we adopt the serialization method provided by JDK (not recommended in production environment).

  • serialization
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutput objectOutput = new ObjectOutputStream(byteArrayOutputStream);
objectOutput.writeObject(request);
aioSession.write(byteArrayOutputStream.toByteArray());
Copy the code
  • deserialization
ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
RpcResponse resp = (RpcResponse) objectInput.readObject();
Copy the code

RPC Service Implementation

Through the above scheme, we solved the communication problem of RPC, and then we had to realize the service capability according to the communication message.

Consumer

Since the Consumer side of RPC has only interfaces, there is no concrete implementation, but we expect the same experience as local services. So we need to instantiate interfaces as objects and make them cross-application serviceable, which is where dynamic proxies come in. When the Consumer invokes the RPC interface, the proxy class internally sends a request message to the Provider and gets the result.

obj = (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{remoteInterface}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest req = new RpcRequest(); req.setInterfaceClass(remoteInterface.getName()); req.setMethod(method.getName()); Class<? >[] types = method.getParameterTypes(); if (! ArrayUtils.isEmpty(types)) { String[] paramClass = new String[types.length]; for (int i = 0; i < types.length; i++) { paramClass[i] = types[i].getName(); } req.setParamClassList(paramClass); } req.setParams(args); RpcResponse rmiResp = sendRpcRequest(req); if (StringUtils.isNotBlank(rmiResp.getException())) { throw new RuntimeException(rmiResp.getException()); } return rmiResp.getReturnObject(); }});Copy the code

Provider

The Provider can maintain the RPC services provided by the Provider in the collection, using Map storage. Key indicates the exposed interface name, and value indicates the implementation of the interface. Once the Provider receives the request message from RPC, it only needs to find and execute the corresponding service according to the request message content, and finally returns the result in the form of a message to the Consumer.

ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg)); RpcRequest req = (RpcRequest) objectInput.readObject(); RpcResponse resp = new RpcResponse(req.getUuid()); try { String[] paramClassList = req.getParamClassList(); Object[] paramObjList = req.getParams(); // Get the input type Class<? >[] classArray = null; if (paramClassList ! = null) { classArray = new Class[paramClassList.length]; for (int i = 0; i < classArray.length; i++) { Class<? > clazz = primitiveClass.get(paramClassList[i]); if (clazz == null) { classArray[i] = Class.forName(paramClassList[i]); } else { classArray[i] = clazz; Object impObj = impmap.get (req.getInterfaceclass ()); if (impObj == null) { throw new UnsupportedOperationException("can not find interface: " + req.getInterfaceClass()); } Method method = impObj.getClass().getMethod(req.getMethod(), classArray); Object obj = method.invoke(impObj, paramObjList); resp.setReturnObject(obj); resp.setReturnType(method.getReturnType().getName()); } catch (InvocationTargetException e) { LOGGER.error(e.getMessage(), e); resp.setException(e.getTargetException().getMessage()); } catch (Exception e) { LOGGER.error(e.getMessage(), e); resp.setException(e.getMessage()); } ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); objectOutput = new ObjectOutputStream(byteArrayOutputStream); objectOutput.writeObject(resp); session.write(byteArrayOutputStream.toByteArray());Copy the code

Testing RPC Services

The server defines the interface DemoApi and registers its implementation example DemoApiImpl with the Provider.

public class Provider { public static void main(String[] args) throws IOException { RpcProviderProcessor rpcProviderProcessor = new RpcProviderProcessor(); AioQuickServer<byte[]> server = new AioQuickServer<>(8888, new RpcProtocol(), rpcProviderProcessor); server.start(); rpcProviderProcessor.publishService(DemoApi.class, new DemoApiImpl()); }}Copy the code

Consumer calls RPC interface test and sum to obtain the execution result.

public class Consumer { public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { RpcConsumerProcessor rpcConsumerProcessor = new RpcConsumerProcessor(); AioQuickClient<byte[]> consumer = new AioQuickClient<>("localhost", 8888, new RpcProtocol(), rpcConsumerProcessor); consumer.start(); DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class); System.out.println(demoApi.test("smart-socket")); System.out.println(demoApi.sum(1, 2)); }}Copy the code

conclusion

This article briefly describes the key parts of RPC service implementation, but there are many details that need to be considered to provide a stable and reliable RPC service, which interested friends can study on their own. The complete code for this article’s example is available from the Smart-Socket project.