Netty – based custom protocol communication protocol

1. Definition of communication protocol

field Number of bytes occupied describe
The frame header 2 bytes The value is fixed at 0x55 0xAA
Long degrees 2 bytes Length = command word + argument + checksum, excluding frame header and length bytes
Life has 1 bytes 0 Heartbeat, 1 authentication, 2 Obtaining information
And the number 0~65535 bytes Business data
The checksum 2 bytes Checksum = Frame header + length + command word + sum of bytes of arguments

Framework functions

  1. heartbeat

  2. TCP half packet, sticky packet processing

  3. IP filtering

  4. Log print

  5. User-defined protocol resolution

Business description

(1) The Netty protocol stack client sends a handshake request message carrying authentication information.

(2) The Netty protocol stack server verifies the validity of the handshake request message. After the verification succeeds, the Netty protocol stack server returns a handshake reply message indicating that the login succeeds.

(3) After the link is successfully established, the client sends heartbeat messages and the client sends service messages.

(6) The server responds to heartbeat and business messages;

(7) When the server exits, the server closes the connection, and the client senses that the other party closes the connection and passively closes the client connection.

Complete code download address

Code screenshots

Client startup code

package com.king.netty.core.client;

import com.king.netty.core.DataFrameDecoder;
import com.king.netty.core.DataFrameEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/ * * *@author King
 * @date2021/7/14 * /
public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        startServer();
    }

    static void startServer(a) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                // Set the thread group
                .group(group)
                // Set to NIO mode
                .channel(NioSocketChannel.class)
                // Set all channelHandlers in the pipeline
                // The inbound channelHandler needs to be ordered
                // The outbound channelHandler needs to be in order
                .handler(new ClientHandlerInit());
        bootstrap.connect("127.0.0.1".8888).sync();
    }

    static class ClientHandlerInit extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // Logs are printed
            pipeline.addLast(new LoggingHandler(LogLevel.INFO));
            / / LengthFieldBasedFrameDecoder used to solve the problem of TCP stick pack half a pack
            pipeline.addLast(new LengthFieldBasedFrameDecoder(
                    65535.// maxFrameLength Maximum length of a message
                    2.// lengthFieldOffset specifies the offset of the length field, indicating that the length field is skipped after a specified number of bytes
                    2.// lengthFieldLength Specifies the field that records the length of the frame data, i.e. the length of the length field itself
                    0.// lengthAdjustment a modified value of length, which can be positive or negative. After reading the length value N of the packet, Netty considers that the following N bytes need to be read, but depending on the actual situation, it may need to increase or decrease the N value. How much you increase, how much you decrease, you write it in this parameter
                    2              / / initialBytesToStrip from skip the number of bytes in the data frame, after getting a complete packet said, throw away how many bytes in the packet, is the subsequent actual need of business data.
            ));
            // Custom protocol decoder
            pipeline.addLast(new DataFrameDecoder());
            // Custom protocol encoder
            pipeline.addLast(new DataFrameEncoder());
            // The handler that handles authentication requests
            pipeline.addLast(new AuthorizationRequestHandler());
            // The heartbeat handler
            pipeline.addLast(new HeartBeatRequestHandler());
            // Client business handler
            pipeline.addLast(newClientBusinessHandler()); }}}Copy the code

The client requests authentication Handler code

package com.king.netty.core.client;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/ * * *@author King
 * @date2021/7/14 * /
public class AuthorizationRequestHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // Initiate an authentication request after the connection is successful
        ctx.writeAndFlush(DataFrame.getAuthorizationDataFrame());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DataFrame dataFrame = (DataFrame) msg;
        // Process the authentication response
        if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
            byte[] params = dataFrame.getParams();
            if (! "success".equals(new String(params))){
                // Failed to authenticate and closed the connectionReferenceCountUtil.release(msg); ctx.close(); }}// The authentication succeeds, and the message continues
        // Non-authenticated responses are forwarded to subsequent servicesctx.fireChannelRead(msg); }}Copy the code

Client heartbeat Handler code

package com.king.netty.core.client;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.concurrent.TimeUnit;

/ * * *@author King
 * @date2021/7/14 * /
public class HeartBeatRequestHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DataFrame dataFrame = (DataFrame) msg;
        switch (dataFrame.getCmd()){
            // If it is a heartbeat response, release it because subsequent business handlers care
            case DataFrame.CMD_HEART_BEAT:
                ReferenceCountUtil.release(msg);
                break;
            // If the response is successfully authenticated, the heartbeat is periodically sent
            case DataFrame.CMD_AUTHORIZATION:
                // Use the netty built-in task processor to send a heartbeat every 10 seconds
                ctx.executor().scheduleAtFixedRate(() -> {
                    ctx.writeAndFlush(DataFrame.getHeartBeatDataFrame());
                }, 0.10, TimeUnit.SECONDS);
                ctx.fireChannelRead(msg);
                break;
            default:
                // Pass the message backwards and let the business handler handle it
                ctx.fireChannelRead(msg);
                break; }}}Copy the code

Client business Handler code

package com.king.netty.core.client;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/ * * *@author King
 * @date2021/7/14 * /
public class ClientBusinessHandler extends ChannelInboundHandlerAdapter {

    public static final Logger logger = LoggerFactory.getLogger(ClientBusinessHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DataFrame dataFrame = (DataFrame) msg;
        if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
            // Send a service request
            ctx.writeAndFlush(new DataFrame(DataFrame.CMD_GET_INFO, "which language is the best ?".getBytes()));
        }else {
            // Prints messages sent by the server
            logger.debug("receive message: "+ dataFrame); } ReferenceCountUtil.release(msg); }}Copy the code

Server startup code

package com.king.netty.core.server;

import com.king.netty.core.DataFrameDecoder;
import com.king.netty.core.DataFrameEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ipfilter.IpFilterRule;
import io.netty.handler.ipfilter.IpFilterRuleType;
import io.netty.handler.ipfilter.RuleBasedIpFilter;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/ * * *@author King
 * @date2021/7/14 * /
@Component
public class NettyServer implements InitializingBean.DisposableBean {

    private boolean started;
    private Channel channel;
    private NioEventLoopGroup parentGroup;
    private NioEventLoopGroup childGroup;

    @Override
    public void destroy(a) throws Exception {
        // Spring calls stop to release the server when the object is destroyed
        if(started){ stopServer(); }}@Override
    public void afterPropertiesSet(a) throws Exception {
        // After spring initializes the object, it invokes the start method to start the service
        if (started){
            return;
        }
        startServer();
    }

    void startServer(a) throws InterruptedException {
        this.parentGroup = new NioEventLoopGroup();
        this.childGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                // Set the thread group
                .group(parentGroup, childGroup)
                // Set to NIO mode
                .channel(NioServerSocketChannel.class)
                // Set the TCP sync queue size to prevent flood attacks
                .childOption(ChannelOption.SO_BACKLOG, 1024)
                // Set all channelHandlers in the pipeline
                // The inbound channelHandler needs to be ordered
                // The outbound channelHandler needs to be in order
                .childHandler(new ServerHandlerInit());
        this.channel = serverBootstrap.bind(8888).sync().channel();
        started = true;
    }

    void stopServer(a){
        try{
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
            channel.closeFuture().syncUninterruptibly();
        }finally {
            this.parentGroup = null;
            this.childGroup = null;
            this.channel = null;
            started = false; }}static class ServerHandlerInit extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // Logs are printed
            pipeline.addLast(new LoggingHandler(LogLevel.INFO));
            / / IP filter
            pipeline.addLast(new RuleBasedIpFilter(new IpFilterRule() {
                @Override
                public boolean matches(InetSocketAddress remoteAddress) {
                    // Custom IP address interceptor. IP addresses that do not start with 127 are not allowed to connect
                    return ! remoteAddress.getHostName().startsWith("127");
                }
                @Override
                public IpFilterRuleType ruleType(a) {
                    returnIpFilterRuleType.REJECT; }}));/ / LengthFieldBasedFrameDecoder used to solve the problem of TCP stick pack half a pack
            pipeline.addLast(new LengthFieldBasedFrameDecoder(
                    65535.// maxFrameLength Maximum length of a message
                    2.// lengthFieldOffset specifies the offset of the length field, indicating that the length field is skipped after a specified number of bytes
                    2.// lengthFieldLength Specifies the field that records the length of the frame data, i.e. the length of the length field itself
                    0.// lengthAdjustment a modified value of length, which can be positive or negative. After reading the length value N of the packet, Netty considers that the following N bytes need to be read, but depending on the actual situation, it may need to increase or decrease the N value. How much you increase, how much you decrease, you write it in this parameter
                    2              / / initialBytesToStrip from skip the number of bytes in the data frame, after getting a complete packet said, throw away how many bytes in the packet, is the subsequent actual need of business data.
            ));
            // Set the heartbeat timeout period to 30 seconds. If no heartbeat is received within 30 seconds, a ReadTimeoutException will be thrown
            pipeline.addLast(new ReadTimeoutHandler(30));
            // Custom protocol decoder
            pipeline.addLast(new DataFrameDecoder());
            // Custom protocol encoder
            pipeline.addLast(new DataFrameEncoder());
            // Authentication processing
            pipeline.addLast(new AuthorizationResponseHandler());
            // Heartbeat processing
            pipeline.addLast(new HeartBeatResponseHandler());
            // Business processing handler
            pipeline.addLast(newServerBusinessHandler()); }}}Copy the code

Server authentication processing Handler

package com.king.netty.core.server;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/ * * *@author King
 * @date2021/7/14 * /
public class AuthorizationResponseHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DataFrame dataFrame = (DataFrame) msg;
        if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
            String auth = "{\"username\":\"test\", \"password\":\"abcdef\"}";
            byte[] params = dataFrame.getParams();
            if (auth.equals(new String(params))){
                // The authentication succeeded
                ctx.writeAndFlush(new DataFrame(dataFrame.getCmd(), "success".getBytes()));
            }else {
                // Authentication failed
                ctx.writeAndFlush(new DataFrame(dataFrame.getCmd(), "fail".getBytes()));
            }
            // Release the message
            ReferenceCountUtil.release(msg);
        }else {
            // Non-authenticated requests are forwarded to subsequent servicesctx.fireChannelRead(msg); }}}Copy the code

Server heartbeat processing Handler

package com.king.netty.core.server;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;

/ * * *@author King
 * @date2021/7/14 * /
public class HeartBeatResponseHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DataFrame dataFrame = (DataFrame) msg;
        // If it is a heartbeat request, release it because subsequent business handlers care
        if (dataFrame.getCmd() == DataFrame.CMD_HEART_BEAT) {
            ctx.writeAndFlush(DataFrame.getHeartBeatDataFrame());
            ReferenceCountUtil.release(msg);
        } else {// Pass the message backwards and let the business handler handle itctx.fireChannelRead(msg); }}@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof ReadTimeoutException){
            // Disconnect the client
            ctx.close();
            return;
        }
        super.exceptionCaught(ctx, cause); }}Copy the code

Server service processing Handler

package com.king.netty.core.server;

import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/ * * *@author King
 * @date2021/7/14 * /
public class ServerBusinessHandler extends ChannelInboundHandlerAdapter {

    public static final Logger logger = LoggerFactory.getLogger(ServerBusinessHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DataFrame dataFrame = (DataFrame) msg;
        logger.debug("receive message: " + dataFrame);

        // Return client data
        DataFrame response = doBusiness(dataFrame);
        ctx.writeAndFlush(response);
    }

    private DataFrame doBusiness(DataFrame dataFrame){
        // Take care of your business
        // todo
        // Respond to the client
        return new DataFrame(dataFrame.getCmd(), "java is the best language".getBytes()); }}Copy the code

Protocol definition code

package com.king.netty.core;

import lombok.Data;

/ * * *@author King
 * @date2021/7/14 * /
@Data
public class DataFrame {

    public static final byte CMD_HEART_BEAT = 0;
    public static final byte CMD_AUTHORIZATION = 1;
    public static final byte CMD_GET_INFO = 2;

    /** * Frame header length Life parameter checksum * 0x55 0xAA 2byte 1byte 0~1476bytes 2bytes ** Length = command word + parameter + checksum, excluding frame header and length bytes; * Checksum = Frame header + length + command word + sum of bytes of arguments. * * /

    public static final byte[] HEADER = new byte[] {0b01010101, (byte) 0b10101010};

    private byte cmd;

    private byte[] params;

    private int crc;

    public DataFrame(byte cmd, byte[] params, int crc) {
        this.cmd = cmd;
        this.params = params;
        this.crc = crc;
    }

    public DataFrame(byte cmd, byte[] params) {
        this.cmd = cmd;
        this.params = params;
        this.crc = getCrc();
    }

    public boolean checkCrc(a){
        return getCrc() == this.crc;
    }

    public int getLength(a) {
        // Length = command word + argument + checksum, excluding frame header and length bytes;
        return  1 + params.length + 2;
    }

    public int getCrc(a){
        // Checksum = frame header + length + command word + sum of bytes of arguments.
        int crc = 0;
        / / frame head
        crc += 0b01010101;
        crc += 0b10101010;
        / / the length
        crc += getLength();
        / / parameters and
        for (byte b: params){
            crc += (b & 0xFF);
        }
        return crc;
    }

    public static DataFrame getHeartBeatDataFrame(a){
        return new DataFrame(DataFrame.CMD_HEART_BEAT, new byte[] {}); }public static DataFrame getAuthorizationDataFrame(a){
        String msg = "{\"username\":\"test\", \"password\":\"abcdef\"}";
        return new DataFrame(DataFrame.CMD_AUTHORIZATION, msg.getBytes());
    }

    @Override
    public String toString(a) {
        return "DataFrame{" +
                "cmd=" + cmd +
                ", params=" + new String(params) +
                '} '; }}Copy the code

Protocol decoder

package com.king.netty.core;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/ * * *@author King
 * @date2021/7/14 * /
public class DataFrameDecoder extends ByteToMessageDecoder {

    /** * Frame header length Life parameter checksum * 0x55 0xAA 2byte 1byte 0~1476bytes 2bytes ** Length = command word + parameter + checksum, excluding frame header and length bytes; * Checksum = Frame header + length + command word + sum of bytes of arguments. * * /

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        / / length
        int length = in.readShort();
        / / command
        byte cmd = in.readByte();
        / / parameters
        byte[] params = new byte[length-3];
        in.readBytes(params);
        / / the checksum
        int crc = in.readShort();
        DataFrame dataFrame = new DataFrame(cmd, params, crc);
        // Compute the checksum
        if (dataFrame.checkCrc()){
            // Add parsed data to the list and pass it to the subsequent channelHandlerout.add(dataFrame); }; }}Copy the code

Protocol encoder

package com.king.netty.core;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/ * * *@author King
 * @date2021/7/14 * /
public class DataFrameEncoder extends MessageToByteEncoder<DataFrame> {

    @Override
    protected void encode(ChannelHandlerContext ctx, DataFrame msg, ByteBuf out) throws Exception {
        // Write the frame header
        out.writeBytes(DataFrame.HEADER);
        // Write the length
        out.writeShort(msg.getLength());
        // Write the command
        out.writeByte(msg.getCmd());
        / / number of refs
        out.writeBytes(msg.getParams());
        / / the checksumout.writeShort(msg.getCrc()); }}Copy the code