Just as many standard architectural patterns are supported by various proprietary frameworks, common data-processing patterns are often good candidates for goal realization, saving developers a lot of time and effort. This, of course, applies to the topic of this article: encoding and decoding, or the transformation of data from one protocol-specific format to another. These tasks will be handled by components commonly called codecs. Netty provides a variety of components that simplify the process of creating custom codecs to support a wide range of protocols. For example, if you’re building a NetTy-based mail server, You’ll find Netty’s support for codecs invaluable for implementing POP3, IMAP, and SMTP
0 What is a codec
Every web application must be defined
- How do you parse raw bytes that are transferred back and forth between two nodes
- How to convert it to the target application’s data format
This conversion logic is handled by the codec, which consists of an encoder and a decoder, each of which can convert a byte stream from one format to another
So what’s the difference? If you think of a message as a structured sequence of bytes with specific meaning for a particular application — its data. The encoder converts the message into a format suitable for transmission (most likely a byte stream); The decoder converts the network byte stream back to the application’s message format. Thus, the encoder operates on outbound data, while the decoder processes inbound data. With this background information in mind, let’s examine the classes provided by Netty for implementing these two components.
1 Overview of Netty decoding
1.1 Two Questions
In this section, we’ll examine the decoder classes provided by Netty, which cover two different use cases
- Decode bytes into messages – ByteToMessageDecoder and ReplayingDecoder
- Decode one message type into another – MessageToMessageDecoder
Since the decoder is responsible for converting inbound data from one format to another, it shouldn’t surprise you to know that Netty’s decoder implements ChannelInboundHandler. When is the decoder used? It’s simple: it’s used whenever inbound data needs to be converted to the next channel-InboundHandler in the ChannelPipeline. In addition, thanks to ChannelPipeline’s design, multiple decoders can be linked together to implement arbitrarily complex conversion logic, This is also a good example of how Netty supports modularization and reuse of code
Abstract decoder ByteToMessageDecoder
2.1 the sample
Decoding a byte into a message (or another sequence of bytes) is such a common task that Netty specifically provides an abstract base class for it :ByteToMessageDecoder Since it’s impossible to know if a remote node will send the entire message all at once, So this class buffers the inbound data until it’s ready to process it
ByteBuf
ChannelPipeline
ChannelInboundHandler
ByteToMessageDecoder
ByteToMessageDecoder
readInt()
2.2 Source Code Parsing
2.2.1 Cumulative byte stream
@Override public void channelRead(ChannelHandlerContext ctx, Object MSG) throws Exception {// Decode based on ByteBuf, if the current Object is not propagated directly down if (MSG instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; Cumulation == null; // If the current accumulator is empty, data is read from the I/O stream for the first time. Cumulation = data; cumulation = data; cumulation = data; Cumulator = cumulator.cumulate(ctx.alloc(), cumulation, data); cumulation = cumulator.alloc (), cumulation, data); } // Call the decoding method of the subclass to parse callDecode(CTX, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation ! = null && ! cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = ! out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }}Copy the code
Cumulator is
MERGE_CUMULATOR
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; / / the current pointer back some bytes written, if more than the maximum capacity, expansion the if (cumulation. WriterIndex () > cumulation. MaxCapacity () - in. ReadableBytes () | | cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } // Write the current data to the accumulator buffer.writeBytes(in); // Release the data object in.release(); return buffer; }};Copy the code
2.2.2 Call the decode method of the subclass for parsing
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {// As long as the accumulator has data, the loop will continue while (in.isreadable ()) {int outSize = out.size(); If (outSize > 0) {if (outSize > 0) {if (outSize > 0) {if (outSize > 0) {fireChannelRead(CTX, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } // Record the length of the current readable data int oldInputLength = in.readableBytes(); decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {break; } else { continue; If (oldInputLength == in.readableBytes()) {throw new DecoderException(); StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); }}Copy the code
2.2.2 Propagating the parsed ByteBuf
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation ! = null && ! cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } // Record the length of the current list int size = out.size(); // Propagate decodeWasNull =! out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }}Copy the code
Reference counts in codecs
For the encoder and decoder: once the message is encoding or decoding, it will be ReferenceCountUtil. Release calls automatically release (the message) If you need to keep reference for later use, Then you can call the ReferenceCountUtil. Retain (message) this will increase the reference count, thus preventing the message be released
3 Analysis based on fixed length decoder
/** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes. For example, if you received the following four fragmented packets: * <pre> * +---+----+------+----+ * | A | BC | DEFG | HI | * +---+----+------+----+ * </pre> * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the * following three packets with the fixed length: * <pre> * +-----+-----+-----+ * | ABC | DEF | GHI | * +-----+-----+-----+ * </pre> */ public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; /** * Creates a new instance. * * @param frameLength the length of the frame */ public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException( "frameLength must be a positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded ! = null) { out.add(decoded); } } /** * Create a frame out of the {@link ByteBuf} and return it. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could * be created. */ protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {// Check whether the number of bytes in the current totalizer is smaller than frameLength if (in.readableBytes() < frameLength) {return null; } else { return in.readRetainedSlice(frameLength); }}}Copy the code
4 lines of decoder analysis
4.1 Locating the End of a Row
4.2 Non-discard mode
4.2.1 Finding a newline Character
4.2.2 No Newline character Is Found
If the resolvable length exceeds the maximum resolvable length, the read pointer moves to the write pointer bit (i.e. discarded) and an exception is propagated
4.3 Discard Mode
Find the newline character
Record how many bytes are currently discarded (discarded + to be discarded this time) Lock newline type Move the read pointer directly to the newline after discarded bytes reset to non-discarded state All bytes are discarded before the fast failure mechanism is triggered
A newline character cannot be found
Moves the read pointer directly to the write pointer by directly recording the currently discarded bytes (discarded + currently readable bytes)
5 Decoder analysis based on delimiter
-
The constructor passes in a series of delimiters that divide the binary stream into complete packets through the decoder
-
Decode method
5.1 Analysis of decoding steps
5.1.1 Line processors
-
Line processor decision
-
Define the location
-
Initialization position
-
Judgment separator
5.1.2 Finding the minimum Separator
Iterates through all delimiters to calculate the length of the packet split by each delimiter
5.1.3 decoding
5.1.3.1 Finding the delimiter
If it is not empty, it indicates that the separator has been found as before, and determines whether the current is in discard mode
Non-discarding mode
It’s obviously false the first time, so it’s not a discard mode
If the current packet is larger than the maximum data length allowed for parsing, skip (discard) the packet with the minimum delimiter.
Discard mode
5.1.3.2 No delimiter found
5.1.3.2.1 Non-discard mode
The number of discarded bytes is recorded when the current readable bytes are larger than the maximum data length allowed to be parsed
5.1.3.2.2 Discard Mode
6. Analysis of decoder parameters based on length domain
The important parameters
-
MaxFrameLength (maximum package length)
Netty will do some special processing to prevent memory overflow if the packet size exceeds the maximum size
-
LengthFieldOffset (message body length)
The offset of the length field is lengthFieldOffset. 0 indicates that there is no offset
ByteBuf
Where does it startlength
field -
lengthFieldLength
The length of the domain
length
Field length -
lengthAdjustment
In some cases, the header may also be included in the length length, or the length field may be followed by something that is not included in the length length, which can be adjusted by lengthAdjustment
-
initialBytesToStrip
This setting allows you to dynamically split the received ByteBuf by the value of a field in the message that represents the length of the message if the header is not needed for the data passed to subsequent handlers
6.1 Length Based
LengthFieldBasedFrameDecoder
6.2 Based on length truncation
If the application layer decoder does not need to use the length field, we expect Netty to unpack it so
initialBytesToStrip
initialBytesToStrip
No length field
6.3 Based on offset length
magicNumber
protocol version
meta
lengthFieldOffset
6.4 Unpacking Based on adjustable length
In some cases, binary protocols may be designed as follows
header
- The length field at the front of the packet indicates no offset,
lengthFieldOffset
0 - The length of the length field is 3, i.e
lengthFieldLength
For 3 - The length field represents the length of the package that skips the header and takes another parameter
lengthAdjustment
, the size of the packet length adjustment, the length represented by the value of the length field plus this correction represents the packet with header, here is12 + 2
, the header and the package are 14 bytes
6.5 Truncation based on offset adjustable length
A binary protocol has two headers
HDR1
header
HDR1
magicNumber
magicNumber
Parameter Settings
- The offset in the length field is 1, i.e
lengthFieldOffset
1 - The length domain has length 2, i.e
lengthFieldLength
For 2 - The length of the inclusion represented by the length field is omitted
HDR2
But when unpackingHDR2
It’s also taken apart by Netty as part of the package,HDR2
Of length 1, i.elengthAdjustment
1 - When finished, truncate the first three bytes, i.e
initialBytesToStrip
For 3
6.6 Truncation of adjustable length of variation based on offset
All the preceding length fields indicate the length of the packet without header. If the length fields include the length of the entire packet, see the following
HDR1
HDR2
1 + 1 + 2 + 12 = 16
Parameter Settings
Because Netty does not know the service situation, it needs to tell Netty how many bytes are added after the length field to form a complete packet. In this case, the length field is 13 bytes, and the length field is 16 bytes. Therefore, minus 3 bytes is the required length for real packet unpacking. LengthAdjustment – 3
If your protocol is based on length, consider not implementing it in bytes, but using it directly, or inheriting it and simply modifying it
Decoder analysis based on length domain
7.1 Construction Method
public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, Boolean failFast) {// omit parameter validation this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; }Copy the code
Save the pass parameters in the field
- Whether the byteOrder byte stream represents the data at the big end or the small end for reading in the length field
- LengthFieldEndOffset Specifies the offset of the first byte immediately following the length field field in the entire packet
- failFast
- For true the table reads the length field, the value of TA exceeds
maxFrameLength
, just throwTooLongFrameException
- for
false
The table is thrown only after it has actually read the bytes represented by the value of the length fieldTooLongFrameException
, the default value istrue
, do not change the value. Otherwise, memory overflow may occur
- For true the table reads the length field, the value of TA exceeds
7.2 Abstraction for Package unpacking
Specific unpacking protocols only need to be implemented
void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)Copy the code
In table has not opened the data so far, after the package is added to the out list to achieve the package down
-
Level 1 implementation
The overloaded protected method decode implements true unpacking in three steps
1 Calculate the length of data packets to be extracted
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {// Get the actual unadjusted package length long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < lengthFieldEndOffset) { failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset); } if (frameLength > maxFrameLength) { exceededFrameLength(in, frameLength); return null; }}Copy the code
-
Get the actual byte offset of the length field
-
Adjust the length of the package
-
If the current readable byte has not reached the offset of the length field, it indicates that the length field cannot be read
The getUnadjustedFrameLength method can be overwritten if the value represented by your length field is not a basic int,short, etc
- The length of the check
-
If the length of the entire packet is not longer than the length field, an exception is thrown directly
-
The packet length exceeded the maximum packet length and entered the discard mode. Procedure
- The current readable bytes have been reached
frameLength
, skip itframeLength
After the packet is discarded, it may be a valid packet - The current readable byte is not reached
frameLength
“Indicates that unread bytes are also discarded. In the discard mode, the current accumulated bytes are discarded
- The current readable bytes have been reached
Bytesttestindicates how many bytes remain to be discarded
- Finally, call
failIfNecessary
Determine whether an exception needs to be thrown- There is no need to discard subsequent unread bytes (
bytesToDiscard == 0
) to reset the discard state- If fast failure is not set (
! failFast
), or is set to fail fast and is the first time a large packet error is detected (firstDetectionOfTooLongFrame
), throws an exception and lets handler handle it -
If fast failure is set and a packaging error is detected for the first time, throw an exception and let handler handle it
- If fast failure is not set (
- There is no need to discard subsequent unread bytes (
We can know failFast default is true, but here firstDetectionOfTooLongFrame to true, so, the first detected parcel will throw an exception
3 Processing the discard mode
LengthFieldBasedFrameDecoder decoder method at the entrance there is a piece of code
- If you are in discard mode, calculate the number of bytes to be discarded first and take the minimum value of current discarded bytes and readable bytes. After discarding, enter
failIfNecessary
If failFast is set to false, exceptions will not be thrown until failFast is discarded
2 Skips logical processing of the specified length of bytes
After discarding mode processing and length verification are passed
- Verify whether enough bytes have been read. If yes, extract a complete packet according to
initialBytesToStrip
Of course, the skipped bytes must not be larger than the packet length, otherwise thrownCorruptedFrameException
abnormal
Sampling frame
-
Get the read pointer of the current accumulated data, and then get the actual length of the data packet to be extracted for extraction. After extraction, move the read pointer
- The extraction process is called once
ByteBuf
的retainedSlice
API, which has no memory copy overhead
To see from the actual packet extraction, the parameter passed in is an int, so in a custom protocol, if your length field is 8 bytes, the first 4 bytes are of little use
summary
- If you use Netty and the binary protocol is length based, consider using it
LengthFieldBasedFrameDecoder
By adjusting the parameters, it will satisfy you -
LengthFieldBasedFrameDecoder
Unpacking includes valid parameter verification, exception package handling, and finally calling ByteBuf’s retainedSlice to implement unpacking without memory copy