Sticky/unpack background

First, we know that TCP is a byte stream, which, like our water stream, has no boundaries. Therefore, it will appear sticky/unpack problems; UDP, like ours, is message-oriented, so UDP does not have this problem; TCP subcontracting: Subcontracting may result in: 1. The size of bytes we write to the socket cache exceeds the buffer maximum; 2. When the network sends data, it is limited by the maximum MSS packet length. Will lead to unpacking; 3. In the data link layer, different data link layers are different and may not use Ethernet protocols. Therefore, the number of data transmission units may be small, leading to IP fragmentation. If the amount of data per request is not large, the acquisition avoids the problem of unpacking; However, another problem arises: TCP sticky packets;

TCP sticky packets

The amount of data sent at a time is variable. It could be just to send a heartbeat, and the next second a big file transfer. So we have to deal with changing business scenarios. TCP improves transmission efficiency and is compatible with more complex environments. For example, in order to improve the success rate of network transmission (), the sender usually collects enough data before sending a TCP segment. So if you have multiple requests and the amount of data is very small, then the data sent by TCP may contain multiple requests, which causes multiple packets to stick together. Similarly, because the receiver does not receive packets from the buffer in time, multiple packets are received (the client sends a piece of data, but the server only receives a small part of the data, and the server takes the data left over from the buffer next time, resulting in sticky packets). This is TCP sticky packet behavior. Code simulation client:

/ * * *@auther:lgb
 * @Date: 2021/5/3 * /
public class BIOTcpClient {
    public static void main(String[] args)  {
        // Define the buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = null;
        try {
            / / open the SocketChannel
            socketChannel = SocketChannel.open();
            // Set to non-blocking mode
            socketChannel.configureBlocking(false);
            // Connect to the service
            socketChannel.connect(new InetSocketAddress("127.0.0.1".2333));
            while (true) {
                // finishConnect is an attempted connection and may return false, so an infinite loop is used to check the connection to make sure it is properly established.
                if (socketChannel.finishConnect()) {
                    System.out.println("Client connected to server");
                    int i = 0;
                    while (i < 5) {
                        // Write every second
                        TimeUnit.MILLISECONDS.sleep(1);
                        String info = "From the client" + (i++) + "A message.";
                        buffer.clear();
                        buffer.put(info.getBytes());
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            // Write a message to the servicesocketChannel.write(buffer); }}break; }}}catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if(socketChannel ! =null) {
                    System.out.println("Client Channel closed"); socketChannel.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code

Server side:

/ * * *@auther:lgb
 * @Date: 2021/5/3 * /
public class BIOTcpServer {
    public static void main(String[] args) {
        ServerSocket serverSocket = null;
        int recvMsgSize = 0;
        InputStream in = null;
        try {
            // open a TCP service that listens on port 2333
            serverSocket = new ServerSocket(2333);
            byte[] recvBuf = new byte[1024];
            while (true) {
                // Check for new client connections, block if not
                Socket clntSocket = serverSocket.accept();
                // Get the client address through the socket connected to the server
                SocketAddress clientAddress = clntSocket.getRemoteSocketAddress();
                System.out.println("Connection successful, processing client:" + clientAddress);
                // Delay 10ms to simulate sticky packet phenomenon
                Thread.sleep(10);
                / / data flow
                in = clntSocket.getInputStream();
                // If the client is not disconnected and does not send data to the server, it is always in the state of ready to read and blocks until data is written (read ready).
                while((recvMsgSize = in.read(recvBuf)) ! = -1) {
                    byte[] temp = new byte[recvMsgSize];
                    System.arraycopy(recvBuf, 0, temp, 0, recvMsgSize);
                    System.out.println("Received client" + clientAddress + "Message content:" + new String(temp)); // Prints the message
                }
                System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"); }}catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if(serverSocket ! =null) {
                    System.out.println("Socket down!");
                    serverSocket.close();
                }
                if(in ! =null) {
                    System.out.println("Stream connection closed!"); in.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code

Specific logic can see notes; The delay time on the server is 10ms. Data is not processed in a timely manner, resulting in packet sticking

Change the server side delay to 0ms to receive the correct data:



The solution

  1. The transmission message is fixed length, the length is known, the receiver can obtain data according to the known length;
  2. Add a special mark to the end of the packet as the end condition;
  3. The message is divided into header and body. The header contains the total length of the message;

Custom protocol types of the solution

2.1 NIO programming

Since BIO is synchronous and non-blocking, it requires a thread to process a connection on the server side. Although thread pools can be used to reuse threads, they are not suitable for most scenarios (like file transfers, where data is always transferred). So, NIO was chosen for programming; For an introduction to NIO, see my other article. Introduction and background of NIO

2.2 Programming Implementation

Divide the message into a header and a body, and receive a packet according to the header. As a packet; Because we need to solve the problem when there is a lot of data in the Buffer (simulating the sticky packet phenomenon) to see whether the data can be segmented normally. Solutions are as follows:

  1. The client sends five consecutive messages to the server
  2. When the selector server receives that the state of the client is readable, it delays for 1s to ensure that all five of the client’s data are in a Buffer. Then, the data is processed to see whether the data is sticky data;

2.2.1 Adding a Length Header

The sending end needs to add a fixed header to the data to be sent, and then send it out. Then, the receiving end needs to combine or split packets according to the length information of this header. The utility class for adding headers is as follows:

/ * * *@auther:lgb
 * @Date: 2021/5/3 * /
public class PacketWrapper {

    private int length;
    private byte[] payload;

    public PacketWrapper(String payload) {
        this.payload = payload.getBytes();
        this.length = this.payload.length;
    }

    public PacketWrapper(byte[] payload) {
        this.payload = payload;
        this.length = this.payload.length;
    }

    public byte[] getBytes() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(this.length + 4);
        byteBuffer.putInt(this.length);
        System.out.println("The length the client sends is:" + this.length);
        byteBuffer.put(payload);
        returnbyteBuffer.array(); }}Copy the code

2.2.2 Client code

/ * * *@auther:lgb
 * @Date: 2021/5/3 * /
public class NIOTcpClient {
    private static final int PORT = 5555;
    private static final String IP_ADDRESS = "localhost";
    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);

        socketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));
        while(! socketChannel.finishConnect()) {};new Thread(new SendRunnable(socketChannel)).start();

        System.out.println("Connecting to " + IP_ADDRESS + " on " + PORT);
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    StringBuilder sb = new StringBuilder();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    while (channel.read(byteBuffer) > 0) {
                        byteBuffer.flip();
                        sb.append(new String(byteBuffer.array()));
                        byteBuffer.clear();
                    }
                    System.out.println("[server] "+ sb.toString()); } iterator.remove(); }}}private static class SendRunnable implements Runnable {

        private SocketChannel socketChannel;
        public SendRunnable(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @Override
        public void run(a) {
            System.out.println("Type to send message:");
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                socketChannel.socket().setSendBufferSize(102400);
            } catch (SocketException e) {
                e.printStackTrace();
            }
            int i = 0;
            while (i < 5) {
                try {
                    String msg = "The client sent the number" + (i++) + "A message.";
                    // buffer.flip();
                    // buffer.compact(); // It makes sense to use compact() from read mode to write mode;
                    buffer.clear();
                    byte[] bytes = new PacketWrapper(msg).getBytes();
                    System.out.println(msg);
                    buffer.put(bytes);
                    // Change to read mode
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        // Write a message to the service
                       socketChannel.write(buffer);


                    }
                    Thread.sleep(20);

                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Copy the code

2.2.3 Server code

/ * * *@auther:lgb
 * @Date: 2021/5/3 * /
public class NIOTcpServer {

    private static final int PORT = 5555;
    private static ByteBuffer byteBuffer = ByteBuffer.allocate(10240);
    private static int number = 0;

    public static void main(String[] args) throws IOException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("server start on port " + PORT + "...");

        while (true) {

            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();

                if(! selectionKey.isValid())continue;

                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = serverChannel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
                    // start the thread of writing
                    new Thread(new SendRunnable(socketChannel)).start();
                    Socket socket = socketChannel.socket();
                    System.out.println("Get a client, the remote client address: " + socket.getRemoteSocketAddress());
                } else if (selectionKey.isReadable()) {
                    // Read data from channel to buffer
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    socketChannel.socket().setReceiveBufferSize(102400); String remoteAddress = socketChannel.socket().getRemoteSocketAddress().toString(); processByHead(socketChannel); } iterator.remove(); }}}private static class SendRunnable implements Runnable {

        private SocketChannel socketChannel;

        public SendRunnable(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        public void run(a) {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                try {
                    String msg = bufferedReader.readLine();
                    this.socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
                } catch(IOException e) { e.printStackTrace(); }}}}private static void processByHead(SocketChannel socketChannel) throws IOException {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (socketChannel.read(byteBuffer) > 0) {
            int position = byteBuffer.position();
            int limit = byteBuffer.limit();
            byteBuffer.flip();

            while (byteBuffer.remaining() > 4) {
                System.out.println("remaining :" + byteBuffer.remaining());
                System.out.println("position :" + byteBuffer.position());
                System.out.println("limit :" + byteBuffer.limit());
                // Take four bytes from the given position and combine them into an int in byte order
                int length = byteBuffer.getInt();
                if(length ! =0) {
                    System.out.println("The length the client sends is:" + length);
                    if (byteBuffer.remaining() < length) {
                        byteBuffer.position(position);
                        byteBuffer.limit(limit);
                        continue;
                    }

                    byte[] data = new byte[length];
                    byteBuffer.get(data, 0, length);
                    System.out.println(new String(data) + "< -- - >" + number++);

                } else {
                    break; }}// The client still needs to write data; Note that reading half the data and then writing the data makes sense when compact() is usedbyteBuffer.compact(); }}}Copy the code

The tests are as follows: client result:



Server results:



Results analysis:

The server receives position = 0 for the first time. Limit = 175; These data packets are read into Buffer together, and there is a prerequisite for sticky packets. Since the first 4 bytes are read, it is found that the length is 31; So, the last 31 bytes are read first; Then, read the data asThe client sends message 0. So position equals 35 remaining equals 140 is still reading; Read is not finished until Remain < 4;

To sum up: the measured results meet the solution of sticky phenomenon proposed at the beginning;

Gitee code downloadThe relevant reference blog is as follows:

Nio principle