Original address: PJMike’s blog

preface

There are plenty of examples on the web of how to build RPC frameworks using Netty, so why should I write an article here? I know I may not be as good at it as they are. There are two reasons for this:

  • First, after learning Netty, you need to continue to practice in order to better grasp the use of Netty. Obviously, implementing RPC framework based on Netty is a good practice.
  • Second, there are many RPC frameworks on the market, such as Dubbo, whose communication base is Netty. Therefore, through this example, we can better experience the design of RPC.

The following points illustrate how to implement a simple RPC framework based on Netty:

  • What is RPC?
  • What are the aspects of implementing an RPC framework?
  • How to use Netty?

What is RPC?

RPC, remote procedure calls, can do it like a local call to invoke a remote service, is a way of communication between processes, concept must have it is clear to all, after read RPC article 58 Shen Jian wrote, realized that actually we can change a way of thinking to understand RPC, which is from local call, and then to derive the RPC calls

1. Local function calls

Local functions are common, as in the following example:

public String sayHello(String name) {
    return "hello, " + name;
}
Copy the code

All we need to do is pass in an argument and call the sayHello method to get an output, which is the input parameter — > method body — > output. The input parameter, the output parameter, and the method body are all in the same process space. This is the local function call

2. The Socket communication

Is there a way to communicate between different processes? The caller is in process A and needs to call method A, but method A is in process B

The easiest way to think of is to use Socket communication, using Socket can complete cross-process call, we need to agree a process communication protocol, to pass parameters, call functions, out parameters. We need to pay more attention to some details. For example, parameters and functions need to be converted to byte stream for network transmission, that is, serialization operation, and then the parameter needs to be deserialized. Using socket for low-level communication, code programming is more prone to error.

It would be a disaster if a caller had that much to worry about, so is there an easy way for our caller to pass in arguments, call the method, and wait for the result to return as if it were a local function?

3. The RPC framework

RPC framework is used to solve the above problems, it can let the caller call the remote service like calling local functions, the underlying communication details are transparent to the caller, all kinds of complexity are shielded, give the caller the ultimate experience.

What are the concerns of RPC calls

The RPC framework lets callers call remote services as if they were local functions, so how do you do this?

In use, the caller is to directly call the local function, passing in the corresponding parameters, it does not care about other details, as for communication details to the RPC framework to achieve. In fact, the RPC framework takes the proxy class approach, specifically the dynamic proxy approach, where new classes are dynamically created at run time, namely proxy classes, in which communication details, such as parameter serialization, are implemented.

Agreed, of course, not only is the serialization, we also need a communication protocol format, both sides good agreement format, such as the data type of the request parameters, the parameters of the request, the request, such as the method name it according to the format for serialized for network transmission, then the server shall be carried out in accordance with the specified format decoding after receipt of the request object, This way the server knows exactly which method to call and which parameters to pass in.

Just now, network transmission was mentioned again. An important part of RPC framework is network transmission. Services are deployed on different hosts. In fact, it’s using our hero of the day, Netty, which is a high-performance network communication framework that’s up to the task.

With all that said, here are a few points that the next RPC framework should focus on:

  • Proxy (dynamic proxy)
  • Communication protocol
  • serialization
  • Network transmission

Of course, a good RPC framework needs to focus on more than the above points, but this article aims to make a simple RPC framework, understanding the above key points is enough

Implement RPC framework based on Netty

Finally to the highlight of this article, we will be based on the implementation of RPC needs to pay attention to several points (proxy, serialization, protocol, codec), using Netty to implement one by one

1. Protocol

First of all, we need to determine the protocol format of the communication parties, the request object and the response object

Request object:

@Data
@ToString
public class RpcRequest {
    /**
     * 请求对象的ID
     */
    private String requestId;
    /** * Class name */
    private String className;
    /** * method name */
    private String methodName;
    /**
     * 参数类型
     */
    privateClass<? >[] parameterTypes;/** * enter */
    private Object[] parameters;
}

Copy the code
  • The ID of the request object is used by the client to verify that the server request and response match

Response object:

@Data
public class RpcResponse {
    /** * response ID */
    private String requestId;
    /** * error message */
    private String error;
    /** * the result returned */
    private Object result;
}
Copy the code

2. The serialization

There are many serialization protocols in the market, such as JDK built-in, Google Protobuf, Kyro, Hessian, etc., as long as you do not choose the JDK built-in serialization method (because of its poor performance, the code stream generated after serialization is too large), other methods are actually ok, here for convenience, JSON is selected as serialization protocol and FastJSON is used as JSON framework

For the convenience of subsequent extension, first define the serialization interface:

public interface Serializer {
    /** * Java objects are converted to binary **@param object
     * @return* /
    byte[] serialize(Object object) throws IOException;

    /** * binary conversion to Java object **@param clazz
     * @param bytes
     * @param <T>
     * @return* /
    <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException;
}
Copy the code

Since we are using JSON, we define the JSONSerializer implementation class:

public class JSONSerializer implements Serializer{

    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        returnJSON.parseObject(bytes, clazz); }}Copy the code

If you want to use other serialization methods later, you can implement your own serialization interface

3. Codec

Having agreed on the protocol format and serialization, we also need the codec, which converts the request object into a format suitable for transport (typically a byte stream), and the corresponding decoder, which converts the network byte stream back to the application’s message format.

Encoder implementation:

public class RpcEncoder extends MessageToByteEncoder {
    privateClass<? > clazz;private Serializer serializer;

    public RpcEncoder(Class
        clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }


    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {
        if(clazz ! =null && clazz.isInstance(msg)) {
            byte[] bytes = serializer.serialize(msg); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); }}}Copy the code

Decoder implementation:

public class RpcDecoder extends ByteToMessageDecoder {
    privateClass<? > clazz;private Serializer serializer;

    public RpcDecoder(Class
        clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // Write an Int (4 bytes) to indicate the length
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        // Mark the current read position
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        // Read the data in byteBuf into the data byte arraybyteBuf.readBytes(data); Object obj = serializer.deserialize(clazz, data); list.add(obj); }}Copy the code

4. Netty client

Let’s take a look at how the Netty client is implemented, that is, how to start the client using Netty.

In fact, as anyone familiar with Netty knows, we need to pay attention to the following points:

  • Write a startup method that specifies that the transport uses a Channel
  • Specify ChannelHandler to read and write data in network transmission
  • Add codecs
  • Add a retry failure mechanism
  • Adds a method to send a request message

Here is the concrete implementation code:

@Slf4j
public class NettyClient {
    private EventLoopGroup eventLoopGroup;
    private Channel channel;
    private ClientHandler clientHandler;
    private String host;
    private Integer port;
    private static final int MAX_RETRY = 5;
    public NettyClient(String host, Integer port) {
        this.host = host;
        this.port = port;
    }
    public void connect(a) {
        clientHandler = new ClientHandler();
        eventLoopGroup = new NioEventLoopGroup();
        / / start the class
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                // Specify the Channel used for the transmission
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // Add the encoder
                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        // Add a decoder
                        pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
                        // Request handling classpipeline.addLast(clientHandler); }}); connect(bootstrap, host, port, MAX_RETRY); }/** * Failed to reconnect mechanism, refer to Netty's introduction to the actual game nuggets small book **@param bootstrap
     * @param host
     * @param port
     * @param retry
     */
    private void connect(Bootstrap bootstrap, String host, int port, int retry) {
        ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("Connection to server successful");
            } else if (retry == 0) {
                log.error("Retry count exhausted, connection aborted");
            } else {
                // How many times to rejoin:
                int order = (MAX_RETRY - retry) + 1;
                // The interval between reconnections
                int delay = 1 << order;
                log.error("{} : connection failed, reconnect.... at {}".new Date(), order);
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS); }}); channel = channelFuture.channel(); }/** * Send message **@param request
     * @return* /
    public RpcResponse send(final RpcRequest request) {
        try {
            channel.writeAndFlush(request).await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return clientHandler.getRpcResponse(request.getRequestId());
    }
    @PreDestroy
    public void close(a) { eventLoopGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); }}Copy the code

Our data processing will focus on the ClientHandler class, which inherits the ChannelDuplexHandler class to handle outbound and inbound data

public class ClientHandler extends ChannelDuplexHandler {
    /** * Use Map to maintain the mapping between request object ID and response result Future */
    private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RpcResponse) {
            // Get the response object
            RpcResponse response = (RpcResponse) msg;
            DefaultFuture defaultFuture =
            futureMap.get(response.getRequestId());
            // Write the result to DefaultFuture
            defaultFuture.setResponse(response);
        }
        super.channelRead(ctx,msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof RpcRequest) {
            RpcRequest request = (RpcRequest) msg;
            Before sending the request object, save the request ID and build a mapping to the response Future
            futureMap.putIfAbsent(request.getRequestId(), new DefaultFuture());
        }
        super.write(ctx, msg, promise);
    }

    /** * Obtain the response result **@param requsetId
     * @return* /
    public RpcResponse getRpcResponse(String requsetId) {
        try {
            DefaultFuture future = futureMap.get(requsetId);
            return future.getRpcResponse(10);
        } finally {
            // Remove it from the map after it is successfully obtainedfutureMap.remove(requsetId); }}}Copy the code

Reference article: xilidou.com/2018/09/26/…

As you can see from the above implementation, we define a Map to maintain the mapping between the request ID and the response result. The purpose is for the client to verify that the server response matches the request. Because the Netty channel may be used by multiple threads, when the result returns, you do not know from which thread. So you need a mapping.

Our results are wrapped in DefaultFuture. Because Netty is an asynchronous framework, all the returns are based on the Future and Callback mechanism.

public class DefaultFuture {
    private RpcResponse rpcResponse;
    private volatile boolean isSucceed = false;
    private final Object object = new Object();

    public RpcResponse getRpcResponse(int timeout) {
        synchronized (object) {
            while(! isSucceed) {try {
                    object.wait(timeout);
                } catch(InterruptedException e) { e.printStackTrace(); }}returnrpcResponse; }}public void setResponse(RpcResponse response) {
        if (isSucceed) {
            return;
        }
        synchronized (object) {
            this.rpcResponse = response;
            this.isSucceed = true; object.notify(); }}}Copy the code
  • We actually use wait and notify with a Boolean variable as an aid

5. The Netty server

The implementation on the Netty server side is similar to the implementation on the client side, except that after the request is decoded, the local function needs to be invoked by proxy. Here is the server-side code:

public class NettyServer implements InitializingBean {
    private EventLoopGroup boss = null;
    private EventLoopGroup worker = null;
    @Autowired
    private ServerHandler serverHandler;
    @Override
    public void afterPropertiesSet(a) throws Exception {
        // ZooKeeper is used as the registry. This document is not involved and can be ignored
        ServiceRegistry registry = new ZkServiceRegistry("127.0.0.1:2181");
        start(registry);
    }

    public void start(ServiceRegistry registry) throws Exception {
        // The thread pool that handles client connections
        boss = new NioEventLoopGroup();
        // The thread pool that handles reads and writes
        worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // Add a decoder
                        pipeline.addLast(new RpcEncoder(RpcResponse.class, new JSONSerializer()));
                        // Add the encoder
                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));
                        // Add a request handlerpipeline.addLast(serverHandler); }}); bind(serverBootstrap,8888);
    }

    /** * If the port bonding fails, the number of ports is +1, and the port is bound again@param serverBootstrap
     * @param port
     */
    public void bind(final ServerBootstrap serverBootstrap,int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("Port [{}] bound successfully",port);
            } else {
                log.error("Port [{}] binding failed", port);
                bind(serverBootstrap, port + 1); }}); }@PreDestroy
    public void destory(a) throws InterruptedException {
        boss.shutdownGracefully().sync();
        worker.shutdownGracefully().sync();
        log.info("Netty off"); }}Copy the code

Here is the Handler class that handles reads and writes:

@Component
@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> implements ApplicationContextAware {
    private ApplicationContext applicationContext;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) {
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(msg.getRequestId());
        try {
            Object handler = handler(msg);
            log.info("Get return result: {}", handler);
            rpcResponse.setResult(handler);
        } catch (Throwable throwable) {
            rpcResponse.setError(throwable.toString());
            throwable.printStackTrace();
        }
        ctx.writeAndFlush(rpcResponse);
    }

    /** * The server uses the proxy to process requests **@param request
     * @return* /
    private Object handler(RpcRequest request) throws ClassNotFoundException, InvocationTargetException {
        // Use class.forname to load the Class fileClass<? > clazz = Class.forName(request.getClassName()); Object serviceBean = applicationContext.getBean(clazz); log.info("serviceBean: {}",serviceBean); Class<? > serviceClass = serviceBean.getClass(); log.info("serverClass:{}",serviceClass); String methodName = request.getMethodName(); Class<? >[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters();// use CGLIB Reflect
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        log.info("Start calling CGLIB dynamic proxy to execute server-side methods...");
        return fastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext; }}Copy the code

6. Client proxy

The client uses the Java dynamic proxy, which implements the communication details in the proxy class. As you know, the Java dynamic proxy needs to implement the InvocationHandler interface

@Slf4j
public class RpcClientDynamicProxy<T> implements InvocationHandler {
    private Class<T> clazz;
    public RpcClientDynamicProxy(Class<T> clazz) throws Exception {
        this.clazz = clazz;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = newRpcRequest(); String requestId = UUID.randomUUID().toString(); String className = method.getDeclaringClass().getName(); String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes(); request.setRequestId(requestId); request.setClassName(className); request.setMethodName(methodName); request.setParameterTypes(parameterTypes); request.setParameters(args); log.info("Request content: {}",request);
        
        // Start the Netty client for direct connection
        NettyClient nettyClient = new NettyClient("127.0.0.1".8888);
        log.info("Start connecting to server: {}".new Date());
        nettyClient.connect();
        RpcResponse send = nettyClient.send(request);
        log.info("Request call returns result: {}", send.getResult());
        returnsend.getResult(); }}Copy the code
  • Encapsulate the request object in the Invoke method, build the NettyClient object, and start the client to send the request message

The agent factory category is as follows:

public class ProxyFactory {
    public static <T> T create(Class<T> interfaceClass) throws Exception {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),newClass<? >[] {interfaceClass},newRpcClientDynamicProxy<T>(interfaceClass)); }}Copy the code
  • Use proxy. newProxyInstance to create a Proxy class for the interface

7. RPC remote call test

API:

public interface HelloService {
    String hello(String name);
}
Copy the code
  • Prepare a test API

Client:

@SpringBootApplication
@Slf4j
public class ClientApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ClientApplication.class, args);
        HelloService helloService = ProxyFactory.create(HelloService.class);
        log.info("Response result" : {}",helloService.hello("pjmike")); }}Copy the code
  • The client invokes the interface’s methods

Server:

// Server implementation
@Service
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "hello, "+ name; }}Copy the code

Running results:

summary

Above we based on Netty implementation of a non non very simple RPC framework, far from mature RPC framework, even said that the basic registry has not been implemented, but through this practice, can be said that I have a deeper understanding of RPC, understand what an RPC framework needs to pay attention to. In the future, when we use a mature RPC framework such as Dubbo, we will be able to understand that the underlying framework is also using Netty as the basic communication framework. Later, it will be relatively easy to dig deeper into the source code of the open RPC framework

Project address: github.com/pjmike/spri…

References & acknowledgements

  • Xilidou.com/2018/09/26/…
  • www.cnblogs.com/luxiaoxun/p…
  • My.oschina.net/huangyong/b…
  • www.w3cschool.cn/architectro…
  • www.cnkirito.moe/dubbo27-fea…
  • Juejin. Cn/post / 684490…
  • Juejin. Cn/book / 684473…