serialization

Java handwritten RPC (01) based socket implementation from scratch

Java hand-written RPC (02) -Netty4 implements both client and server side from scratch

Java from scratch handwritten RPC (03) how to implement client call server?

In the previous sections we implemented the most basic client-side invocation of the server. In this section we will look at object serialization in communication.

Why serialization is needed

Netty uses ByteBuf to communicate.

Previously we used the encoder/decoder to handle the input/output parameters of the computation so that we could use poJOs directly.

One problem, however, is that if we want to abstract our project into a framework, we need to write encoders/decoders for all of our objects.

Obviously, writing a pair directly from each object is not practical, and how users will use it is unknown.

Serialization

Byte – based implementation, good performance, not high readability.

String-based implementations, such as JSON serialization, are readable but perform relatively poorly.

Ps: It can be selected according to personal well-being, and relevant serialization can be referred to below, which is not expanded here.

Introduction to json serialization framework

Implementation approach

We can convert all of our POJOs to byte, and then byte to ByteBuf.

And vice versa.

Code implementation

maven

Introduce serialization package:

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>json</artifactId>
    <version>while</version>
</dependency>
Copy the code

The service side

The core

Server-side code can be greatly simplified:

serverBootstrap.group(workerGroup, bossGroup)
    .channel(NioServerSocketChannel.class)
    // Prints logs
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline()
                    .addLast(newRpcServerHandler()); }})// This parameter affects connections that have not yet been accepted
    .option(ChannelOption.SO_BACKLOG, 128)
    // The server will send an ACK packet to determine if the client is still alive after a period of time when the client does not respond.
    .childOption(ChannelOption.SO_KEEPALIVE, true);
Copy the code

All you need is an implementation class.

RpcServerHandler

The serialization/deserialization of the server side is adjusted to use JsonBs directly.

package com.github.houbb.rpc.server.handler;

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/ * * *@author binbin.hou
 * @since0.0.1 * /
public class RpcServerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcServerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel {} connected " + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        final String id = ctx.channel().id().asLongText();

        ByteBuf byteBuf = (ByteBuf)msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class);
        log.info("[Server] receive channel {} request: {} from ", id, request);

        Calculator calculator = new CalculatorService();
        CalculateResponse response = calculator.sum(request);

        // Write back to the client
        byte[] responseBytes = JsonBs.serializeBytes(response);
        ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
        ctx.writeAndFlush(responseBuffer);
        log.info("[Server] channel {} response {}", id, response); }}Copy the code

The client

The core

The client can be simplified as follows:

channelFuture = bootstrap.group(workerGroup)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .handler(new ChannelInitializer<Channel>(){
        @Override
        protected void initChannel(Channel ch) throws Exception {
            channelHandler = new RpcClientHandler();
            ch.pipeline()
                    .addLast(new LoggingHandler(LogLevel.INFO))
                    .addLast(channelHandler);
        }
    })
    .connect(RpcConstant.ADDRESS, port)
    .syncUninterruptibly();
Copy the code

RpcClientHandler

The serialization/deserialization of the client is adjusted to be implemented directly using JsonBs.

package com.github.houbb.rpc.client.handler;

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateResponse;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/** * <p> Client processing class </p> ** <pre> Created: 2019/10/1611:30pm </pre> * <pre> Project: RPC </pre> **@author houbinbin
 * @sinceHundreds * /
public class RpcClientHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(RpcClient.class);

    /** * Response message *@since0.0.4 * /
    private CalculateResponse response;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf)msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);

        this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class);
        log.info("[Client] response is :{}", response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // If you don't get a response, I don't know why.
        // If it is not closed, it will always be blocked.
        ctx.flush();
        ctx.close();
    }

    public CalculateResponse getResponse(a) {
        returnresponse; }}Copy the code

summary

In order to facilitate learning, the above source code has been open source:

github.com/houbb/rpc

I hope this article is helpful to you. If you like it, please click to collect and forward a wave.

I am an old horse, looking forward to meeting with you next time.