Sticky/unpack background
First, we know that TCP is a byte stream, which, like our water stream, has no boundaries. Therefore, it will appear sticky/unpack problems; UDP, like ours, is message-oriented, so UDP does not have this problem; TCP subcontracting: Subcontracting may result in: 1. The size of bytes we write to the socket cache exceeds the buffer maximum; 2. When the network sends data, it is limited by the maximum MSS packet length. Will lead to unpacking; 3. In the data link layer, different data link layers are different and may not use Ethernet protocols. Therefore, the number of data transmission units may be small, leading to IP fragmentation. If the amount of data per request is not large, the acquisition avoids the problem of unpacking; However, another problem arises: TCP sticky packets;
TCP sticky packets
The amount of data sent at a time is variable. It could be just to send a heartbeat, and the next second a big file transfer. So we have to deal with changing business scenarios. TCP improves transmission efficiency and is compatible with more complex environments. For example, in order to improve the success rate of network transmission (), the sender usually collects enough data before sending a TCP segment. So if you have multiple requests and the amount of data is very small, then the data sent by TCP may contain multiple requests, which causes multiple packets to stick together. Similarly, because the receiver does not receive packets from the buffer in time, multiple packets are received (the client sends a piece of data, but the server only receives a small part of the data, and the server takes the data left over from the buffer next time, resulting in sticky packets). This is TCP sticky packet behavior. Code simulation client:
/ * * *@auther:lgb
* @Date: 2021/5/3 * /
public class BIOTcpClient {
public static void main(String[] args) {
// Define the buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
try {
/ / open the SocketChannel
socketChannel = SocketChannel.open();
// Set to non-blocking mode
socketChannel.configureBlocking(false);
// Connect to the service
socketChannel.connect(new InetSocketAddress("127.0.0.1".2333));
while (true) {
// finishConnect is an attempted connection and may return false, so an infinite loop is used to check the connection to make sure it is properly established.
if (socketChannel.finishConnect()) {
System.out.println("Client connected to server");
int i = 0;
while (i < 5) {
// Write every second
TimeUnit.MILLISECONDS.sleep(1);
String info = "From the client" + (i++) + "A message.";
buffer.clear();
buffer.put(info.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
// Write a message to the servicesocketChannel.write(buffer); }}break; }}}catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if(socketChannel ! =null) {
System.out.println("Client Channel closed"); socketChannel.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code
Server side:
/ * * *@auther:lgb
* @Date: 2021/5/3 * /
public class BIOTcpServer {
public static void main(String[] args) {
ServerSocket serverSocket = null;
int recvMsgSize = 0;
InputStream in = null;
try {
// open a TCP service that listens on port 2333
serverSocket = new ServerSocket(2333);
byte[] recvBuf = new byte[1024];
while (true) {
// Check for new client connections, block if not
Socket clntSocket = serverSocket.accept();
// Get the client address through the socket connected to the server
SocketAddress clientAddress = clntSocket.getRemoteSocketAddress();
System.out.println("Connection successful, processing client:" + clientAddress);
// Delay 10ms to simulate sticky packet phenomenon
Thread.sleep(10);
/ / data flow
in = clntSocket.getInputStream();
// If the client is not disconnected and does not send data to the server, it is always in the state of ready to read and blocks until data is written (read ready).
while((recvMsgSize = in.read(recvBuf)) ! = -1) {
byte[] temp = new byte[recvMsgSize];
System.arraycopy(recvBuf, 0, temp, 0, recvMsgSize);
System.out.println("Received client" + clientAddress + "Message content:" + new String(temp)); // Prints the message
}
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"); }}catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
if(serverSocket ! =null) {
System.out.println("Socket down!");
serverSocket.close();
}
if(in ! =null) {
System.out.println("Stream connection closed!"); in.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code
Specific logic can see notes; The delay time on the server is 10ms. Data is not processed in a timely manner, resulting in packet sticking
Change the server side delay to 0ms to receive the correct data:
The solution
- The transmission message is fixed length, the length is known, the receiver can obtain data according to the known length;
- Add a special mark to the end of the packet as the end condition;
- The message is divided into header and body. The header contains the total length of the message;
Custom protocol types of the solution
2.1 NIO programming
Since BIO is synchronous and non-blocking, it requires a thread to process a connection on the server side. Although thread pools can be used to reuse threads, they are not suitable for most scenarios (like file transfers, where data is always transferred). So, NIO was chosen for programming; For an introduction to NIO, see my other article. Introduction and background of NIO
2.2 Programming Implementation
Divide the message into a header and a body, and receive a packet according to the header. As a packet; Because we need to solve the problem when there is a lot of data in the Buffer (simulating the sticky packet phenomenon) to see whether the data can be segmented normally. Solutions are as follows:
- The client sends five consecutive messages to the server
- When the selector server receives that the state of the client is readable, it delays for 1s to ensure that all five of the client’s data are in a Buffer. Then, the data is processed to see whether the data is sticky data;
2.2.1 Adding a Length Header
The sending end needs to add a fixed header to the data to be sent, and then send it out. Then, the receiving end needs to combine or split packets according to the length information of this header. The utility class for adding headers is as follows:
/ * * *@auther:lgb
* @Date: 2021/5/3 * /
public class PacketWrapper {
private int length;
private byte[] payload;
public PacketWrapper(String payload) {
this.payload = payload.getBytes();
this.length = this.payload.length;
}
public PacketWrapper(byte[] payload) {
this.payload = payload;
this.length = this.payload.length;
}
public byte[] getBytes() {
ByteBuffer byteBuffer = ByteBuffer.allocate(this.length + 4);
byteBuffer.putInt(this.length);
System.out.println("The length the client sends is:" + this.length);
byteBuffer.put(payload);
returnbyteBuffer.array(); }}Copy the code
2.2.2 Client code
/ * * *@auther:lgb
* @Date: 2021/5/3 * /
public class NIOTcpClient {
private static final int PORT = 5555;
private static final String IP_ADDRESS = "localhost";
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
socketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));
while(! socketChannel.finishConnect()) {};new Thread(new SendRunnable(socketChannel)).start();
System.out.println("Connecting to " + IP_ADDRESS + " on " + PORT);
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (channel.read(byteBuffer) > 0) {
byteBuffer.flip();
sb.append(new String(byteBuffer.array()));
byteBuffer.clear();
}
System.out.println("[server] "+ sb.toString()); } iterator.remove(); }}}private static class SendRunnable implements Runnable {
private SocketChannel socketChannel;
public SendRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run(a) {
System.out.println("Type to send message:");
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel.socket().setSendBufferSize(102400);
} catch (SocketException e) {
e.printStackTrace();
}
int i = 0;
while (i < 5) {
try {
String msg = "The client sent the number" + (i++) + "A message.";
// buffer.flip();
// buffer.compact(); // It makes sense to use compact() from read mode to write mode;
buffer.clear();
byte[] bytes = new PacketWrapper(msg).getBytes();
System.out.println(msg);
buffer.put(bytes);
// Change to read mode
buffer.flip();
while (buffer.hasRemaining()) {
// Write a message to the service
socketChannel.write(buffer);
}
Thread.sleep(20);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Copy the code
2.2.3 Server code
/ * * *@auther:lgb
* @Date: 2021/5/3 * /
public class NIOTcpServer {
private static final int PORT = 5555;
private static ByteBuffer byteBuffer = ByteBuffer.allocate(10240);
private static int number = 0;
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server start on port " + PORT + "...");
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if(! selectionKey.isValid())continue;
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// start the thread of writing
new Thread(new SendRunnable(socketChannel)).start();
Socket socket = socketChannel.socket();
System.out.println("Get a client, the remote client address: " + socket.getRemoteSocketAddress());
} else if (selectionKey.isReadable()) {
// Read data from channel to buffer
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.socket().setReceiveBufferSize(102400); String remoteAddress = socketChannel.socket().getRemoteSocketAddress().toString(); processByHead(socketChannel); } iterator.remove(); }}}private static class SendRunnable implements Runnable {
private SocketChannel socketChannel;
public SendRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run(a) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
String msg = bufferedReader.readLine();
this.socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
} catch(IOException e) { e.printStackTrace(); }}}}private static void processByHead(SocketChannel socketChannel) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (socketChannel.read(byteBuffer) > 0) {
int position = byteBuffer.position();
int limit = byteBuffer.limit();
byteBuffer.flip();
while (byteBuffer.remaining() > 4) {
System.out.println("remaining :" + byteBuffer.remaining());
System.out.println("position :" + byteBuffer.position());
System.out.println("limit :" + byteBuffer.limit());
// Take four bytes from the given position and combine them into an int in byte order
int length = byteBuffer.getInt();
if(length ! =0) {
System.out.println("The length the client sends is:" + length);
if (byteBuffer.remaining() < length) {
byteBuffer.position(position);
byteBuffer.limit(limit);
continue;
}
byte[] data = new byte[length];
byteBuffer.get(data, 0, length);
System.out.println(new String(data) + "< -- - >" + number++);
} else {
break; }}// The client still needs to write data; Note that reading half the data and then writing the data makes sense when compact() is usedbyteBuffer.compact(); }}}Copy the code
The tests are as follows: client result:
Server results:
Results analysis:
The server receives position = 0 for the first time. Limit = 175; These data packets are read into Buffer together, and there is a prerequisite for sticky packets. Since the first 4 bytes are read, it is found that the length is 31; So, the last 31 bytes are read first; Then, read the data asThe client sends message 0. So position equals 35 remaining equals 140 is still reading; Read is not finished until Remain < 4;
To sum up: the measured results meet the solution of sticky phenomenon proposed at the beginning;
Gitee code downloadThe relevant reference blog is as follows:
Nio principle