Mainly divides into four parts | | – glue bag unpacking and solutions – code implementation | | – ByteToMessageDecoder source analysis, process flow chart
Stick package
When too much data is transmitted, TCP will subcontract the data block to send, that is, in the network delay problem, originally a complete data block is divided into two packets, when the start to read data, only received one packet, this time, how to do? If one packet is processed first, the data is incomplete. Waiting for? So how do you know if the data is complete?
unpacking
TCP is transmitted as a byte stream, and data is stored in a buffer. Although the data is sent per packet, if there is a network delay, the second packet is sent while the data of the first packet is still in the buffer. At this point, the data of the second packet is also stored in the buffer. At this point, it is not known where the first packet ends and the data of the second packet begins to read. This is called TCP sticky packets.
The solution
Solution 1: In fact, the solution is how to customize the protocol package to solve the problem of sticky packets and incomplete packets. User-defined protocols include:
- A start flag: for example, define an Int, a four-byte flag. The start flag is read as the start of a packet.
- Data length: also Int4 bytes, indicating the size of the data block, so that it can know whether the packet has been received, if not, then wait.
- Data: Actual transmission data
Code implementation
Protocol package object class
/ * * * * your * * packet format defined by agreement + + -- -- -- -- -- -- -- -- -- - + -- -- -- + x | agreement began to sign data length | | | * + + -- -- -- -- -- -- -- -- -- - + - - - + * 1. Protocol start flag head_data. The value is an int and 0X76 * 2 in hexadecimal notation. The length of the transmitted data contentLength, int * 3. Data to transfer ** / public class CustomDate {/** * The message header is a constant X077 */ private final int head_Date = Costom.HEAD_DATA.getVaule(); /** * private int contentLength; /** * message contents */ private byte[] conctent; public CustomDate() { super(); } public CustomDate(int contentLength, byte[] conctent) { this.contentLength = contentLength; this.conctent = conctent; } public int getContentLength() { return contentLength; } public void setContentLength(int contentLength) { this.contentLength = contentLength; } public byte[] getConctent() { return conctent; } public void setConctent(byte[] conctent) { this.conctent = conctent; } public int getHead_Date() { return head_Date; }}Copy the code
Decoding the class
Core ideas:
- 1 Determine whether the byte size is the basic data length (flag + data length) before reading data.
- 2 If the buffer data is too large, which is not normal, 2048 bytes should be moved and the following bytes processed directly. This could be caused by network latency or malicious sending of large amounts of data.
- 3 start reading buffer, buffer operation. First mark the reading mark, then start looking for the start mark, if not the start mark, then skip a mark node.
- 4 If the start tag is found, continue to get the length. If the length is larger than the readable length of the buffer, there is still data to arrive. Roll back to the reading marker. Continue to wait for data.
- 5 If the data has already arrived, start reading the data area.
Inherit ByteToMessageDecoder class. This class converts bytes read from the network buffer into a meaningful message object
/ * * * * your * * packet format defined by agreement + + -- -- -- -- -- -- -- -- -- - + -- -- -- + x | agreement began to sign data length | | | * + + -- -- -- -- -- -- -- -- -- - + - - - + * 1. Protocol start flag head_data. The value is an int and 0X76 * 2 in hexadecimal notation. The length of the transmitted data contentLength, int * 3. Public Class CustomDecoder extends ByteToMessageDecoder {/** * public Class CustomDecoder extends ByteToMessageDecoder {/** * */ private final int BASE_LENGTH = 4 + 4; */ private final int BASE_LENGTH = 4 + 4; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { //1. If (buffer.readableBytes() > BASE_LENGTH) {//2. If (buffer.readableBytes() > BASE_LENGTH) {//2. If (buffer.readableBytes() > 2048) {// Move readerIndex buffer = buffer.skipBytes(buffer.readableBytes()); } //3. Start reading int beginRead; While (true) {// Get the index at the beginning of the header; beginRead = buffer.readerIndex(); // mark the beginning of the packet's index buffer.markReaderIndex(); If (buffer.readint () == costom.head_data.getvaule ()) {break; } // The protocol start is not read, and the flag buffer.resetreaderIndex () is returned; // Skip one byte buffer.readbyte (); // If (buffer.readableBytes() < BASE_LENGTH) {return; }} // Get the message length int length = buffer.readint (); If (buffer.readableBytes() < length) {buffer.resetreaderIndex (); return; } byte[] date = new byte[length]; buffer.readBytes(date); CustomDate customDate = new CustomDate(length, date); out.add(customDate); }}}Copy the code
Coding class
-
It’s just writing data into the buffer.
-
Inheritance MessageToByteEncoder
public class CustomEncoder extends MessageToByteEncoder {
@Override protected void encode(ChannelHandlerContext ctx, CustomDate msg, ByteBuf out) throws Exception { out.writeInt(msg.getHead_Date()); out.writeInt(msg.getContentLength()); out.writeBytes(msg.getConctent()); } Copy the code
}
Add to pipe
public class ServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomDecoder());
pipeline.addLast(new CustomEncoder());
pipeline.addLast(new ServerHandler());
}
}
Copy the code
Output protocol data
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof CustomDate) { CustomDate customDate= (CustomDate) msg; byte[] conctent = customDate.getConctent(); System.out.println(" Fetch contents "+new String(conctent)); ReferenceCountUtil.release(msg); }}}Copy the code
Process analysis
Let’s look at the decoding process. 1 custom decoder class is inherited ByteToMessageDecoder class. Take a look at the ByteToMessageDecoder class
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{}
Copy the code
Can see ChannelInboundHandlerAdapter ByteToMessageDecoder is inheritance, that is to say, the data processing should be by rewriting channelRead () class. Continue to watch ByteToMessageDecoder’s channelRead() method
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); ByteBuf data = (ByteBuf) MSG; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 2 Start decode callDecode(CTX, cumulation, out); } finally {// resource release code}} else {ctx.fireChannelRead(MSG); }}Copy the code
The focus is on the 2 callDecode () method. The method is to start decoding. Continue to look at the method
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { // 1 while (in.isReadable()) { int outSize = out.size(); // 2 if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } //3 int oldInputLength = in.readableBytes(); //4 decodeRemovalReentryProtection(ctx, in, out); if (ctx.isRemoved()) { break; } //5 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( ); } if (isSingleDecode()) { break; }}}}Copy the code
1 Use a while loop to continuously process the buffer if there is still readable data in the buffer. 2 This is a very good design. The Out variable stores objects generated by decoding. If there is already an object in out, it is passed to the next handler (the output handler in this program) using the fireChannelRead () method.
This is because: sticky package !!!!! After processing the data of one packet, the buffer still contains the data of the next packet. Therefore, the processed data packet is sent to the next handler before the buffer reads the data.
3 Make a mark. Log whether the buffer data is read from this decoding (i.e., whether data is read). If not, then the while loop ends.
Why do we have to make this mark? Why close the loop? Because: the buffer data is not read, that is, the data is not complete, need to wait for complete data processing. So we need to end the while loop. Wait for the next processing.
4 decodeRemovalReentryProtection () is invoked his rewriting of decode () method.
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; Decode decode(CTX, in, out); } finally {// omit}}Copy the code
So this is where we’re going to see if the tag in 3 is going to exit the loop.
Process flow chart
After code analysis or confused? Let’s take a look at the flow chart again