“This is the 21st day of my participation in the First Challenge 2022. For details: First Challenge 2022”
Netty Sticking and unpacking packets
TCP sticky packet unpacking refers to that several packets sent by the sender are glued into one packet or a certain packet is unwrapped for receiving. As shown in the following figure, the client sends two packets, D1 and D2, but the server may receive data in the following cases.
Why do sticky bags appear
TCP is connection-oriented and stream-oriented and provides high reliability services. Both the sending and receiving ends (client and server) need to have pairs of sockets. Therefore, in order to send multiple packets to the receiving end more effectively, the sending end uses optimization method (Nagle algorithm) to merge the data with smaller intervals and small data volume into a large data block and then encapsulate the packet. This improves efficiency, but it makes it difficult for the receiver to distinguish the complete packet because flow-oriented communication has no message protection boundaries. The following code shows an example of sticky packages:
Server code
Service startup code
package com.jony.netty.chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class ChatServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Add decoder to pipeline. AddLast ("decoder", new StringDecoder()); Pipeline. AddLast ("encoder", new StringEncoder()); pipeline.addLast(new ChatServerHandler()); // add your own business handler}}); System.out.println(" Chatroom server started... ") ); ChannelFuture channelFuture = bootstrap.bind(9999).sync(); // Close channelfuture.channel ().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code
Send and receive message code
package com.jony.netty.chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; Public class ChatServerHandler extends SimpleChannelInboundHandler < String > {/ / GlobalEventExecutor. The INSTANCE is a global event actuators, Is a singleton private static ChannelGroup ChannelGroup = new DefaultChannelGroup (GlobalEventExecutor. INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // Indicates that the channel is ready, @override public void channelActive(ChannelHandlerContext CTX) throws Exception {Channel Channel = ctx.channel(); // This method will iterate through all channels in the channelGroup, And send the message channelGroup. WriteAndFlush (" [client] "+ channel. RemoteAddress () +" launched "+ SDF. The format (new Java. Util. The Date ()) + "\n"); // Add the current channel to channelGroup channelgroup.add (channel); System.out.println(ctx.channel().remoteAddress() + "live "+ "\n"); } // Indicates that the channel is inactive, @Override public void channelInactive(ChannelHandlerContext CTX) throws Exception {Channel Channel = ctx.channel(); / / to the current online customers will leave the customer information push channelGroup. WriteAndFlush (" [client] "+ channel. RemoteAddress () +" offline "+" \ n "); System.out.println(ctx.channel().remoteAddress() + "offline "+ "\n"); System.out.println("channelGroup size=" + channelGroup.size()); } @override protected void channelRead0(ChannelHandlerContext CTX, String MSG) throws Exception {// Obtain the current channel channel channel = ctx.channel(); ForEach (ch -> {if (channel! WriteAndFlush ("[client]" + channel.remoteAddress() + "send message:" + MSG + "\n"); WriteAndFlush (" + MSG + "\n");} else {writeAndFlush(" + MSG + "\n"); }}); System.out.println(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// Close the channel ctx.close(); }}Copy the code
Client code
Client connection code
package com.jony.netty.chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class ChatClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); pipeline.addLast(new ChatClientHandler()); }}); ChannelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); Channel = channelfuture.channel (); System.out.println("========" + channel.localAddress() + "========"); // Scanner Scanner = new Scanner(system.in); // Scanner Scanner = new Scanner(system.in); // while (scanner.hasNextLine()) { // String msg = scanner.nextLine(); // // send to server via channel // channel.writeandFlush (MSG); // } for (int i = 0; i < 200; I++) {channel.writeandflush ("hello, jony!" ); } } finally { // group.shutdownGracefully(); }}}Copy the code
The client code mainly uses the following code to send messages frequently
for (int i = 0; i < 200; I++) {channel.writeandflush ("hello, jony!" ); }Copy the code
Client sends and receives message code
package com.jony.netty.chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); }}Copy the code
The execution result
The result is that the for loop only sends a message once, but after TCP is optimized, the message is stuck to the packet. Although the efficiency is improved, this is not the desired result. Here is how to unpack the message
[himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony! [himself] sends a message: Hello, Jony! Hello, which jony! Hello, which jony! Hello, which jony!Copy the code
The solution
1) Format data: each piece of data has a fixed format (start, end), this method is simple, but when selecting the start and end of the character must pay attention to each piece of data must not appear inside the start or end character. Disadvantages: When sending a message to this solution can be added to the message of a fixed delimiter, reads the message in again through the separator for unpacking, but this scheme, low code maintainability, on the one hand, one thousand when sending a message to have our default delimiter, message will be disorder, on the other hand, the other personnel in the maintenance of the code, Or when adding other logic, it’s easy to ignore this separator without knowing it.
2) Sending length: Send each piece of data together with the length of the data. For example, the first four bits of each piece of data can be selected as the length of the data. The application layer can determine the start and end of each piece of data according to the length.
Through the understanding of the above scheme, obviously the second scheme is more secure, nonsense not to say, on the code.
unpacking
Create a wrapper class for the message
You need two fields, one character length and one character content
package com.jony.netty.split; Public class MyMessageProtocol {public class MyMessageProtocol {private int len; Private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; }}Copy the code
Server code
Server connects and adds codecs
If the server does not send messages and only needs to receive messages, only the decoder can be added
package com.jony.netty.split; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class MyServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyServerHandler()); }}); System.out.println("netty server start." ); ChannelFuture channelFuture = serverBootstrap.bind(9000).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code
Core code: the main added decoder and message processor
pipeline.addLast(new MyMessageDecoder());
pipeline.addLast(new MyServerHandler());
Copy the code
Decoder (Data processing and parsing)
This code encapsulates the length of the string in the class based on the message, then reads the string and sends it to the next Hander
package com.jony.netty.split; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MyMessageDecoder extends ByteToMessageDecoder { int length = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(); System.out.println("MyMessageDecoder decode is called "); // Binary bytecode -> MyMessageProtocol packet (object) system.out.println (in); if(in.readableBytes() >= 4) { if (length == 0){ length = in.readInt(); } if (in.readableBytes() < length) {system.out.println (" There is not enough data currently readable, continue to wait." ); return; } byte[] content = new byte[length]; if (in.readableBytes() >= length){ in.readBytes(content); MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } length = 0; }}}Copy the code
Read and write messages
package com.jony.netty.split; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol MSG) throws Exception {system.out. println("==== the server receives the following message ===="); System.out.println(" length =" + msg.getlen ()); System.out.println(" content =" + new String(msg.getContent(), charsetutil.utf_8)); System.out.println(" Number of packets received by the server =" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
Client code
Client connects and adds related components
package com.jony.netty.split; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class MyClient { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyClientHandler()); }}); System.out.println("netty client start." ); ChannelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); ChannelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); }}}Copy the code
Client encoder
package com.jony.netty.split; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception {system.out.println ("MyMessageEncoder encode method called "); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); }}Copy the code
Client message handler
Each sent message is converted to byte, and then the message length and content are set into the message encapsulation class for sending
package com.jony.netty.split; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i = 0; i< 200; I++) {String MSG = "hello, I am zhang SAN!" ; MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length); messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
The execution result
Normal reading
MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 0, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! The server receives the message package number = 1 MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 28, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! The server receives the message package number = 2 MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 56, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! Number of packets received by the server =3Copy the code
Based on the above information, we can see the following code, we currently read a message of 1024 length, and the message content is no sticky packet situation.
PooledUnsafeDirectByteBuf(ridx: 0, widx: 1024, cap: 1024)
Copy the code
Message discontinuity
MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 980, widx: 1024, cap: 1024) ==== The server receives the following message ==== Length =24 Content = Hello, THIS is Zhang SAN! The server receives the message package number = 36 MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 1008, widx: 1024, cap: 1024) current readable data is not enough, continue to wait for. MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 1012, widx: 1024, cap: 1024) current readable data is not enough, continue to wait for. MyMessageDecoder decode is called PooledUnsafeDirectByteBuf (ridx: 1012, widx: 4172, cap: 8192) ==== The server received the following message ==== Length =24 content = Hello, THIS is Zhang SAN! Number of packets received by the server =37Copy the code
As you can see from the code above, when the message length is insufficient, the program will wait to read, wait until the next time the message length is sufficient, and then continue to read, as we can see
PooledUnsafeDirectByteBuf(ridx: 1012, widx: 4172, cap: 8192)
Copy the code
Ridx is 1012, the previous widx is 1024, that is, there are two remaining tape read length is not enough, and then the program does not read, wait until the next character length is no problem to continue to read, so as to avoid TCP in the data transmission, automatic to stick or unpack caused by data confusion.