An overview of the


In li Linfeng’s Netty codec framework analysis, various decoders are introduced and combinations are recommended

LengthFieldBasedFrameDecoder
ByteToMessageDecoderCopy the code

These two decoders process business messages. But sometimes inheritance is chosen for flexibility

ByteToMessageDecoderCopy the code

To handle business messages, but directly inherit ByteToMessageDecoder, you need to handle the half-packet problem yourself. In Li linfeng’s Netty Definitive Guide, there is no description of how to customize the decoder to handle half packet messages. This will be described below.

There are at least two things you need to know before reading this article

Netty ByteBuf class API usage 2, what is the TCP half packet

Although JAVA NIO also has a ByteBuffer class, in Netty programs, most of the direct use of Netty ByteBuf class, it wraps more user-friendly interface, reduce the difficulty of using buffer class.

I have written several previous articles on handling half-packet messages, which you can refer to


Custom message protocols


Currently, custom message protocols mostly use the first four bytes of the message to store the length of the message, in the following format

len: Indicates the length of a message, usually used4Bytes save head: the message header body: the message contentCopy the code

Each request for business data, regardless of its size, is represented using the above message format.

Pay attention to

In a real project, the message format may add some flags, such as start flag, end flag, message sequence number, message protocol type (JSON or binary, etc.), which are not included here for the sake of description.


Custom decoders handle half packets of data


As described above, inherit ByteToMessageDecoder class directly and overwrite its decode method at the same time. The complete implementation code is as follows

Server code

package nettyinaction.encode.lengthfield.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new SocketServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally{ parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); }}}package nettyinaction.encode.lengthfield.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class SocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SelfDefineEncodeHandler());
        pipeline.addLast(newBusinessServerHandler()); }}package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
        if (bufferIn.readableBytes() < 4) {
            return;
        }

        int beginIndex = bufferIn.readerIndex();
        int length = bufferIn.readInt();

        if ((bufferIn.readableBytes()+1) < length) {
            bufferIn.readerIndex(beginIndex);
            return;
        }

        bufferIn.readerIndex(beginIndex + 4 + length);

        ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4+ length); otherByteBufRef.retain(); out.add(otherByteBufRef); }}package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class BusinessServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        int length = buf.readInt();
        assert length == (8);

        byte[] head = new byte[4];
        buf.readBytes(head);
        String headString = new String(head);
        assert  "head".equals(headString);

        byte[] body = new byte[4];
        buf.readBytes(body);
        String bodyString = new String(body);
        assert  "body".equals(bodyString); }}Copy the code

Client code

package nettyinaction.encode.lengthfield.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new SocketClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost".8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally{ eventLoopGroup.shutdownGracefully(); }}}package nettyinaction.encode.lengthfield.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class SocketClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(newSocketClientHandler()); }}package nettyinaction.encode.lengthfield.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf buffer = allocator.buffer(20);
        buffer.writeInt(8);
        buffer.writeBytes("head".getBytes());
        buffer.writeBytes("body".getBytes()); ctx.writeAndFlush(buffer); }}Copy the code

Once started, the client sends a message of length 8 to the server, which first decodes the message using the SelfDefineEncodeHandler class to handle the half-packet problem. If the message is a valid complete message, when SelfDefineEncodeHandler processes the message, it will forward the message to The BusinessServerHandler for processing. The BusinessServerHandler simply performs validation to determine whether the message content meets the expectation.

Run the above code and, as expected, it reads and parses the message correctly.

In this example, the core class is SelfDefineEncodeHandler. There is a lot of skill in it, and to understand each line of code in it, you need to analyze it in two cases, namely unpacking and sticky packing

The following two small tests are respectively conducted by unpacking and sticky packing to verify whether SelfDefineEncodeHandler can normally handle the half-packet problem.


Unpacking test


First adjust the code in the channelActive method of the SocketClientHandler class to make the body dozens of times larger to force TCP to send a few requests to the server and see if the SelfDefineEncodeHandler on the server can handle it properly.

UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(1604);
buffer.writeBytes("head".getBytes());
String longMsgBody = "";
for (int i = 0; i < 400; i++) {
    longMsgBody = longMsgBody + "body";
}
buffer.writeBytes(longMsgBody.getBytes());

 ctx.writeAndFlush(buffer);Copy the code

Using a for loop, set the length of the message body to 1600, plus the head of length 4, for a total message length of 1604.

Then adjust the code of the server class SelfDefineEncodeHandler and add three lines of code. The first line of code adds a class variable, count, to count the number of calls to the decode method

private static int count = 0;Copy the code

You then add three lines of code to the decode method

System.out.println("decode call count="+ ++count);
System.out.println("bufferIn.readableBytes()="+bufferIn.readableBytes());
System.out.println("beginIndex="+beginIndex);Copy the code

Print out the size of count and buffering.readableBytes () as well as beginIndex

Finally added in the BusinessServerHandler class

private static int count = 0;Copy the code

Member variables and in the channelRead method

System.out.println("BusinessServerHandler call count="+ ++count);Copy the code

Run the code and print the following

decode call count=1
bufferIn.readableBytes()=1024
beginIndex=0

decode call count=2
bufferIn.readableBytes()=1608
beginIndex=0

BusinessServerHandler call count=1Copy the code

This result shows that although the client sends only one message, the underlying TCP sends two packets to the server. The first packet sends 1024 bytes, and the rest of the message is sent to the server only after a subsequent request.

Although the decode method is called twice, the first read is incomplete, so ByteToMessageDecoder will quietly wait for the arrival of another package, the second read complete message, then forward the message to the BusinessServerHandler class, from the printed results. The channelRead method of the BusinessServerHandler class is called only once.

We know that the decode method of the SelfDefineEncodeHandler class can handle unpacking, but how does it work? Now let’s go back and take a closer look at the code in the decode method.

Part 1 Code

if (bufferIn.readableBytes() < 4) {
            return;
}Copy the code

If the number of bytes received is less than 4 bytes, or even the contents of the message length field are incomplete, return directly.


Part II Code

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if ((bufferIn.readableBytes()+1) < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }Copy the code

For unpacking scenarios, since the complete message has not been read,(bufferin.readableBytes ()+1) Will be less than Length, reset bufferIn’s readerIndex to 0, and exit,ByteToMessageDecoder will wait for the next packet to arrive.

Since readerIndex is reset to 0 in the first call,beginIndex is still 0 when the decode method is called a second time.


Part III Code

bufferIn.readerIndex(beginIndex + 4 + length);Copy the code

Set readerIndex to maximum. First of all, the code can execute here, and for the unpacking scenario, it has read a valid complete message. At this point, the ByteToMessageDecoder class needs to be notified that the data in bufferIn has been read and that the decode method is no longer called. The bottom layer of the ByteToMessageDecoder class determines whether it has finished reading based on the bufferin.isreadable () method. The bufferin.isreadable () method will return false only if readerIndex is set to maximum.


Part iv Code

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);Copy the code

When the decode method after the execution, will release the bufferIn this buffer, if will be released after operation of bufferIn passed to the next processor, once the next processor called bufferIn way to read or write, will quote us IllegalReferenceCountExcept immediately Abnormal ion.

So after slice, a retain operation must be added that increments the bufferIn reference counter by 1 so that the ByteToMessageDecoder will hold bufferIn without releasing it.


Stick pack test


Start by changing the implementation of the channelActive method in the SocketClientHandler class

for (int i = 0; i < 20; i++) {
    UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
    ByteBuf buffer = allocator.buffer(20);
    buffer.writeInt(8);
    buffer.writeBytes("head".getBytes());
    buffer.writeBytes("body".getBytes());

    ctx.writeAndFlush(buffer);
}Copy the code

The client sends 20 requests to the server.

Comment out the SocketServerInitializer class

pipeline.addLast(new SelfDefineEncodeHandler());Copy the code

Code so that the request does not go SelfDefineEncodeHandler decoder.

Run the code and the result is as follows

BusinessServerHandler call count=1Copy the code

Note The client sends sticky packets, and the server receives only one request. Now adjust the code back, go to the SelfDefineEncodeHandler decoder, and run the code as follows

decode call count=1
bufferIn.readableBytes()=240
beginIndex=0
BusinessServerHandler call count=1

decode call count=2
bufferIn.readableBytes()=228
beginIndex=12
BusinessServerHandler call count=2

decode call count=3
bufferIn.readableBytes()=216
beginIndex=24
BusinessServerHandler call count=3

decode call count=4
bufferIn.readableBytes()=204
beginIndex=36
BusinessServerHandler call count=4

decode call count=5
bufferIn.readableBytes()=192
beginIndex=48
BusinessServerHandler call count=5

decode call count=6
bufferIn.readableBytes()=180
beginIndex=60
BusinessServerHandler call count=6

decode call count=7
bufferIn.readableBytes()=168
beginIndex=72
BusinessServerHandler call count=7

decode call count=8
bufferIn.readableBytes()=156
beginIndex=84
BusinessServerHandler call count=8

decode call count=9
bufferIn.readableBytes()=144
beginIndex=96
BusinessServerHandler call count=9

decode call count=10
bufferIn.readableBytes()=132
beginIndex=108
BusinessServerHandler call count=10

decode call count=11
bufferIn.readableBytes()=120
beginIndex=120
BusinessServerHandler call count=11

decode call count=12
bufferIn.readableBytes()=108
beginIndex=132
BusinessServerHandler call count=12

decode call count=13
bufferIn.readableBytes()=96
beginIndex=144
BusinessServerHandler call count=13

decode call count=14
bufferIn.readableBytes()=84
beginIndex=156
BusinessServerHandler call count=14

decode call count=15
bufferIn.readableBytes()=72
beginIndex=168
BusinessServerHandler call count=15

decode call count=16
bufferIn.readableBytes()=60
beginIndex=180
BusinessServerHandler call count=16

decode call count=17
bufferIn.readableBytes()=48
beginIndex=192
BusinessServerHandler call count=17

decode call count=18
bufferIn.readableBytes()=36
beginIndex=204
BusinessServerHandler call count=18

decode call count=19
bufferIn.readableBytes()=24
beginIndex=216
BusinessServerHandler call count=19

decode call count=20
bufferIn.readableBytes()=12
beginIndex=228
BusinessServerHandler call count=20
Copy the code

The results are as expected, with the client sending 20 times and the channelRead of the BusinessServerHandler class on the server executing 20 times. How does the SelfDefineEncodeHandler class do this? I still have to go back and look at the decode method.

Part 1 Code

if (bufferIn.readableBytes() < 4) {
            return;
}Copy the code

If the number of bytes received is less than 4 bytes, or even the contents of the message length field are incomplete, return directly.


Part II Code

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if ((bufferIn.readableBytes()+1) < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }Copy the code

Since the client sent sticky packets, the decode method will receive a large message that aggregates multiple business messages, so (bufferin.readableBytes ()+1) must be greater than Length and bufferIn’s readerIndex will not be reset. However, each time the decode method is executed,beginIndex will be incremented by 12, i.e. (Length +4).


Part III Code

bufferIn.readerIndex(beginIndex + 4 + length);Copy the code

For sticky cases, instead of increasing the readerIndex to the highest, this line of code moves the readerIndex back (Length +4) so that the beginIndex increases (length+4).


Part iv Code

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);Copy the code

Slice operation, the purpose of which is to extract a valid business message from a large message.


Reference article