An overview of the
In li Linfeng’s Netty codec framework analysis, various decoders are introduced and combinations are recommended
LengthFieldBasedFrameDecoder
ByteToMessageDecoderCopy the code
These two decoders process business messages. But sometimes inheritance is chosen for flexibility
ByteToMessageDecoderCopy the code
To handle business messages, but directly inherit ByteToMessageDecoder, you need to handle the half-packet problem yourself. In Li linfeng’s Netty Definitive Guide, there is no description of how to customize the decoder to handle half packet messages. This will be described below.
There are at least two things you need to know before reading this article
Netty ByteBuf class API usage 2, what is the TCP half packet
Although JAVA NIO also has a ByteBuffer class, in Netty programs, most of the direct use of Netty ByteBuf class, it wraps more user-friendly interface, reduce the difficulty of using buffer class.
I have written several previous articles on handling half-packet messages, which you can refer to
- Netty Authoritative Guide Notes – The separator decoder handles half packet problems
- Netty Authoritative Guide Notes – How to handle half-packet problems with messages ending in carriage return newline
Custom message protocols
Currently, custom message protocols mostly use the first four bytes of the message to store the length of the message, in the following format
len: Indicates the length of a message, usually used4Bytes save head: the message header body: the message contentCopy the code
Each request for business data, regardless of its size, is represented using the above message format.
Pay attention to
In a real project, the message format may add some flags, such as start flag, end flag, message sequence number, message protocol type (JSON or binary, etc.), which are not included here for the sake of description.
Custom decoders handle half packets of data
As described above, inherit ByteToMessageDecoder class directly and overwrite its decode method at the same time. The complete implementation code is as follows
Server code
package nettyinaction.encode.lengthfield.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SocketServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}
finally{ parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); }}}package nettyinaction.encode.lengthfield.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class SocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SelfDefineEncodeHandler());
pipeline.addLast(newBusinessServerHandler()); }}package nettyinaction.encode.lengthfield.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
if (bufferIn.readableBytes() < 4) {
return;
}
int beginIndex = bufferIn.readerIndex();
int length = bufferIn.readInt();
if ((bufferIn.readableBytes()+1) < length) {
bufferIn.readerIndex(beginIndex);
return;
}
bufferIn.readerIndex(beginIndex + 4 + length);
ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4+ length); otherByteBufRef.retain(); out.add(otherByteBufRef); }}package nettyinaction.encode.lengthfield.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class BusinessServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
int length = buf.readInt();
assert length == (8);
byte[] head = new byte[4];
buf.readBytes(head);
String headString = new String(head);
assert "head".equals(headString);
byte[] body = new byte[4];
buf.readBytes(body);
String bodyString = new String(body);
assert "body".equals(bodyString); }}Copy the code
Client code
package nettyinaction.encode.lengthfield.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new SocketClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost".8899).sync();
channelFuture.channel().closeFuture().sync();
}
finally{ eventLoopGroup.shutdownGracefully(); }}}package nettyinaction.encode.lengthfield.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class SocketClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(newSocketClientHandler()); }}package nettyinaction.encode.lengthfield.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class SocketClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(8);
buffer.writeBytes("head".getBytes());
buffer.writeBytes("body".getBytes()); ctx.writeAndFlush(buffer); }}Copy the code
Once started, the client sends a message of length 8 to the server, which first decodes the message using the SelfDefineEncodeHandler class to handle the half-packet problem. If the message is a valid complete message, when SelfDefineEncodeHandler processes the message, it will forward the message to The BusinessServerHandler for processing. The BusinessServerHandler simply performs validation to determine whether the message content meets the expectation.
Run the above code and, as expected, it reads and parses the message correctly.
In this example, the core class is SelfDefineEncodeHandler. There is a lot of skill in it, and to understand each line of code in it, you need to analyze it in two cases, namely unpacking and sticky packing
The following two small tests are respectively conducted by unpacking and sticky packing to verify whether SelfDefineEncodeHandler can normally handle the half-packet problem.
Unpacking test
First adjust the code in the channelActive method of the SocketClientHandler class to make the body dozens of times larger to force TCP to send a few requests to the server and see if the SelfDefineEncodeHandler on the server can handle it properly.
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(1604);
buffer.writeBytes("head".getBytes());
String longMsgBody = "";
for (int i = 0; i < 400; i++) {
longMsgBody = longMsgBody + "body";
}
buffer.writeBytes(longMsgBody.getBytes());
ctx.writeAndFlush(buffer);Copy the code
Using a for loop, set the length of the message body to 1600, plus the head of length 4, for a total message length of 1604.
Then adjust the code of the server class SelfDefineEncodeHandler and add three lines of code. The first line of code adds a class variable, count, to count the number of calls to the decode method
private static int count = 0;Copy the code
You then add three lines of code to the decode method
System.out.println("decode call count="+ ++count);
System.out.println("bufferIn.readableBytes()="+bufferIn.readableBytes());
System.out.println("beginIndex="+beginIndex);Copy the code
Print out the size of count and buffering.readableBytes () as well as beginIndex
Finally added in the BusinessServerHandler class
private static int count = 0;Copy the code
Member variables and in the channelRead method
System.out.println("BusinessServerHandler call count="+ ++count);Copy the code
Run the code and print the following
decode call count=1
bufferIn.readableBytes()=1024
beginIndex=0
decode call count=2
bufferIn.readableBytes()=1608
beginIndex=0
BusinessServerHandler call count=1Copy the code
This result shows that although the client sends only one message, the underlying TCP sends two packets to the server. The first packet sends 1024 bytes, and the rest of the message is sent to the server only after a subsequent request.
Although the decode method is called twice, the first read is incomplete, so ByteToMessageDecoder will quietly wait for the arrival of another package, the second read complete message, then forward the message to the BusinessServerHandler class, from the printed results. The channelRead method of the BusinessServerHandler class is called only once.
We know that the decode method of the SelfDefineEncodeHandler class can handle unpacking, but how does it work? Now let’s go back and take a closer look at the code in the decode method.
Part 1 Code
if (bufferIn.readableBytes() < 4) {
return;
}Copy the code
If the number of bytes received is less than 4 bytes, or even the contents of the message length field are incomplete, return directly.
Part II Code
int beginIndex = bufferIn.readerIndex();
int length = bufferIn.readInt();
if ((bufferIn.readableBytes()+1) < length) {
bufferIn.readerIndex(beginIndex);
return;
}Copy the code
For unpacking scenarios, since the complete message has not been read,(bufferin.readableBytes ()+1) Will be less than Length, reset bufferIn’s readerIndex to 0, and exit,ByteToMessageDecoder will wait for the next packet to arrive.
Since readerIndex is reset to 0 in the first call,beginIndex is still 0 when the decode method is called a second time.
Part III Code
bufferIn.readerIndex(beginIndex + 4 + length);Copy the code
Set readerIndex to maximum. First of all, the code can execute here, and for the unpacking scenario, it has read a valid complete message. At this point, the ByteToMessageDecoder class needs to be notified that the data in bufferIn has been read and that the decode method is no longer called. The bottom layer of the ByteToMessageDecoder class determines whether it has finished reading based on the bufferin.isreadable () method. The bufferin.isreadable () method will return false only if readerIndex is set to maximum.
Part iv Code
ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);Copy the code
When the decode method after the execution, will release the bufferIn this buffer, if will be released after operation of bufferIn passed to the next processor, once the next processor called bufferIn way to read or write, will quote us IllegalReferenceCountExcept immediately Abnormal ion.
So after slice, a retain operation must be added that increments the bufferIn reference counter by 1 so that the ByteToMessageDecoder will hold bufferIn without releasing it.
Stick pack test
Start by changing the implementation of the channelActive method in the SocketClientHandler class
for (int i = 0; i < 20; i++) {
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(8);
buffer.writeBytes("head".getBytes());
buffer.writeBytes("body".getBytes());
ctx.writeAndFlush(buffer);
}Copy the code
The client sends 20 requests to the server.
Comment out the SocketServerInitializer class
pipeline.addLast(new SelfDefineEncodeHandler());Copy the code
Code so that the request does not go SelfDefineEncodeHandler decoder.
Run the code and the result is as follows
BusinessServerHandler call count=1Copy the code
Note The client sends sticky packets, and the server receives only one request. Now adjust the code back, go to the SelfDefineEncodeHandler decoder, and run the code as follows
decode call count=1
bufferIn.readableBytes()=240
beginIndex=0
BusinessServerHandler call count=1
decode call count=2
bufferIn.readableBytes()=228
beginIndex=12
BusinessServerHandler call count=2
decode call count=3
bufferIn.readableBytes()=216
beginIndex=24
BusinessServerHandler call count=3
decode call count=4
bufferIn.readableBytes()=204
beginIndex=36
BusinessServerHandler call count=4
decode call count=5
bufferIn.readableBytes()=192
beginIndex=48
BusinessServerHandler call count=5
decode call count=6
bufferIn.readableBytes()=180
beginIndex=60
BusinessServerHandler call count=6
decode call count=7
bufferIn.readableBytes()=168
beginIndex=72
BusinessServerHandler call count=7
decode call count=8
bufferIn.readableBytes()=156
beginIndex=84
BusinessServerHandler call count=8
decode call count=9
bufferIn.readableBytes()=144
beginIndex=96
BusinessServerHandler call count=9
decode call count=10
bufferIn.readableBytes()=132
beginIndex=108
BusinessServerHandler call count=10
decode call count=11
bufferIn.readableBytes()=120
beginIndex=120
BusinessServerHandler call count=11
decode call count=12
bufferIn.readableBytes()=108
beginIndex=132
BusinessServerHandler call count=12
decode call count=13
bufferIn.readableBytes()=96
beginIndex=144
BusinessServerHandler call count=13
decode call count=14
bufferIn.readableBytes()=84
beginIndex=156
BusinessServerHandler call count=14
decode call count=15
bufferIn.readableBytes()=72
beginIndex=168
BusinessServerHandler call count=15
decode call count=16
bufferIn.readableBytes()=60
beginIndex=180
BusinessServerHandler call count=16
decode call count=17
bufferIn.readableBytes()=48
beginIndex=192
BusinessServerHandler call count=17
decode call count=18
bufferIn.readableBytes()=36
beginIndex=204
BusinessServerHandler call count=18
decode call count=19
bufferIn.readableBytes()=24
beginIndex=216
BusinessServerHandler call count=19
decode call count=20
bufferIn.readableBytes()=12
beginIndex=228
BusinessServerHandler call count=20
Copy the code
The results are as expected, with the client sending 20 times and the channelRead of the BusinessServerHandler class on the server executing 20 times. How does the SelfDefineEncodeHandler class do this? I still have to go back and look at the decode method.
Part 1 Code
if (bufferIn.readableBytes() < 4) {
return;
}Copy the code
If the number of bytes received is less than 4 bytes, or even the contents of the message length field are incomplete, return directly.
Part II Code
int beginIndex = bufferIn.readerIndex();
int length = bufferIn.readInt();
if ((bufferIn.readableBytes()+1) < length) {
bufferIn.readerIndex(beginIndex);
return;
}Copy the code
Since the client sent sticky packets, the decode method will receive a large message that aggregates multiple business messages, so (bufferin.readableBytes ()+1) must be greater than Length and bufferIn’s readerIndex will not be reset. However, each time the decode method is executed,beginIndex will be incremented by 12, i.e. (Length +4).
Part III Code
bufferIn.readerIndex(beginIndex + 4 + length);Copy the code
For sticky cases, instead of increasing the readerIndex to the highest, this line of code moves the readerIndex back (Length +4) so that the beginIndex increases (length+4).
Part iv Code
ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);Copy the code
Slice operation, the purpose of which is to extract a valid business message from a large message.
Reference article
- Decoder writing details not covered in Netty’s definitive guide
- Netty codec framework analysis of Netty series
- Netty effectively avoids memory leaks
- Netty High Performance Programming Memo (2)