Interpret RecvBytebufAllocator

In our last reading of message processing, we noticed that there was always a RecvBytebufAllocator affecting our reading of the code logic

RecvByteBufAllocator

This is the top-level AdaptiveRecvByteBufAllocator interface, we come to analyzes one by one

public interface RecvByteBufAllocator {
    /** * Creates a new handle. The handle provides the actual operations and keeps the internal information which is * required for predicting an optimal buffer capacity. */
    // Create a handler that predicts the size of the next byteBuf object to be allocated
    Handle newHandle(a);

    / * * *@deprecated Use {@link ExtendedHandle}.
     */
    @Deprecated
    interface Handle {
        /** * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small * enough not to waste its space. */
        // Allocate the interface of the ByteBuf buffer object, parameters are important! - The handler layer provides the size of the predicted allocation
        // Argument: alloc - The object that actually allocates memory
        ByteBuf allocate(ByteBufAllocator alloc);

        /**
         * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
         * capacity.
         */
        // Get the predicted value of ByteBuf Size
        int guess(a);

        /**
         * Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
         * read loop.
         * <p>
         * This may be used by {@link #continueReading()} to determine if the read operation should complete.
         * </p>
         * This is only ever a hint and may be ignored by the implementation.
         * @param config The channel configuration which may impact this object's behavior.
         */
        void reset(ChannelConfig config);

        /**
         * Increment the number of messages that have been read for the current read loop.
         * @param numMessages The amount to increment by.
         */
        // Increases the number of read messages
        void incMessagesRead(int numMessages);

        /** * Set the bytes that have been read for the last read operation. * This may be used to increment the number of bytes  that have been read. *@param bytes The number of bytes from the previous read operation. This may be negative if an read error
         * occurs. If a negative value is seen it is expected to be return on the next call to
         * {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
         * to this class and is not required to be enforced in {@link #continueReading()}.
         */
        // Set the Size of the last read from a Channel
        void lastBytesRead(int bytes);

        /**
         * Get the amount of bytes for the previous read operation.
         * @return The amount of bytes for the previous read operation.
         */
        // Get the amount of data last read from Channel
        int lastBytesRead(a);

        /**
         * Set how many bytes the read operation will (or did) attempt to read.
         * @param bytes How many bytes the read operation will (or did) attempt to read.
         */
        // Sets the amount of data that you want to read, or that has already been read
        void attemptedBytesRead(int bytes);

        /**
         * Get how many bytes the read operation will (or did) attempt to read.
         * @return How many bytes the read operation will (or did) attempt to read.
         */
        // Get the amount of data that you want to read, or that has already been read
        int attemptedBytesRead(a);

        /**
         * Determine if the current read loop should continue.
         * @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
         */
        / / determine whether need to continue to read the data - corresponds to "read cycle" (NioMessageUnsafe. Read () | NioByteUnsafe. Read ())
        boolean continueReading(a);

        /** * The read has completed. */
        // This read loop is complete
        void readComplete(a); }...Copy the code

MaxMessageRecvByteBufAllocator

// A preliminary enhancement to RecvByteBufAllocator
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
    /**
     * Returns the maximum number of messages to read per read loop.
     * a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
     * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
     */
    // Get the maximum number of messages that can be read per read loop - one pull per Channel is called a message
    int maxMessagesPerRead(a);

    /** * Sets the maximum number of messages to read per read loop. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */
    // Set the maximum number of messages that can be read per read loop
    MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}
Copy the code

Then look at MaxMessageRecvByteBufAllocator implementation class

DefaultMaxMessageRecvByteBufAllocator

public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
    // Indicates the maximum number of messages that can be read per read loop
    private volatile int maxMessagesPerRead;
    private volatile boolean respectMaybeMoreData = true; .// Enhancements to ExtendedHandle
    // The outer class only provides arguments to the inner class. It does nothing else.
    public abstract class MaxMessageHandle implements ExtendedHandle {
        / / config in the channel
        private ChannelConfig config;
        // Indicates the maximum amount of data that can be read in each read loop
        private int maxMessagePerRead;
        // Number of messages that have been read
        private int totalMessages;
        // Size Total size of messages that have been read
        private int totalBytesRead;
        // Estimate the number of bytes to read, or the amount of data that has been read
        private int attemptedBytesRead;
        // Represents the amount of data that was last read
        private int lastBytesRead;
        // true
        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get(a) {
                // Compares whether the estimated read data amount is the same as the last read data amount
                // true - Indicates that the Channel may have data that has not been read and needs to continue reading
                // false: a. Evaluate the amount of data generated ByteBuf > remaining data b.channel closed, lastBytesRead == -1
                // false - indicates that no reading is required
                returnattemptedBytesRead == lastBytesRead; }};/**
         * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
         */
        // Reset the current handler
        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            // Resets the maximum amount of data that can be read in a read loop - 16 by default - for both servers and clients
            maxMessagePerRead = maxMessagesPerRead();
            // Reset the statistics field
            totalMessages = totalBytesRead = 0;
        }

        // Argument: alloc - The allocator that actually allocates memory
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            // Guess () returns the predicted ByteBuf Size, which is an appropriate read Size based on the context of the read loop
            // alloc. IoBuffer (size) - Creates a ByteBuf object based on the estimated size of the ByteBuf object
            return alloc.ioBuffer(guess());
        }

        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }

        @Override
        public void lastBytesRead(int bytes) {
            // Records the data read last time
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes; // Update the total read data volume}}@Override
        public final int lastBytesRead(a) {
            return lastBytesRead;
        }

        // Core - This is also the outer layer that controls the read loop and core method
        @Override
        public boolean continueReading(a) {
            return continueReading(defaultMaybeMoreSupplier);
        }

        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            // The read loop continues only when all four conditions are true
            // Condition 1: config.isautoread () Defaults to true
            / / condition 2: maybeMoreDataSupplier. The get () to determine whether a last read data quantity is equal to the estimated amount of data to read in order to determine whether also need to read data
            // totalMessages < maxMessagePerRead: unsafe.read() indicates that a channel cannot be read for more than 16 times
            TotalBytesRead > 0:
            // Client: This is usually true, as long as the read loop has read data, it will add the read data to totalBytesRead
            If Integer.MAX_VALUE exceeds Integer.MAX_VALUE, the value will be false
            // Server: unsafe.read() does not update totalBytesRead, so it will be false - for a server read loop, the loop will actually only run once
            returnconfig.isAutoRead() && (! respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead >0;
        }

        @Override
        public void readComplete(a) {}@Override
        public int attemptedBytesRead(a) {
            return attemptedBytesRead;
        }

        @Override
        public void attemptedBytesRead(int bytes) {
            attemptedBytesRead = bytes;
        }
        // Returns the total amount of data read
        protected final int totalBytesRead(a) {
            return totalBytesRead < 0? Integer.MAX_VALUE : totalBytesRead; }}}Copy the code

In fact, we’ve seen that the outer class is actually used to provide parameters to the inner class, or to initialize the inner class based on the outer class

In this class, we are obviously more concerned with the way continueReading(), which controls whether we continue to read the loop. To continue, four conditions need to be met at the same time. See the code for details

Then, the most emphasis came to our class AdaptiveRecvByteBufAllocator

AdaptiveRecvByteBufAllocator

/**
 * The {@link RecvByteBufAllocator} that automatically increases and
 * decreases the predicted buffer size on feed back.
 * <p>
 * It gradually increases the expected number of readable bytes if the previous
 * read fully filled the allocated buffer.  It gradually decreases the expected
 * number of readable bytes if the read operation was not able to fill a certain
 * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
 * returning the same prediction.
 */
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
    // Default minimum value
    static final int DEFAULT_MINIMUM = 64;
    // Use an initial value that is bigger than the common MTU of 1500
    static final int DEFAULT_INITIAL = 2048; // Default initial value
    static final int DEFAULT_MAXIMUM = 65536; // Default maximum value
    // Index increment -4
    private static final int INDEX_INCREMENT = 4;
    // index decrement -1
    private static final int INDEX_DECREMENT = 1;
    // size table - Stores different sizes for guess() methods
    // Guess () will take an appropriate size from the size table and then alloc() to allocate memory for ByteBuf
    private static final int[] SIZE_TABLE;
    // Initial code block
    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        // Initialize sizeTable < 512, increment by 16
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        // Suppress a warning since i becomes negative when an integer overflow happens
        // When the value is greater than 512, the value is doubled each time
        Integer.MAX_VALUE = Integer
        for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
            sizeTable.add(i);
        }

        // Next, size_table is initialized
        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); }}/ * * *@deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
     */
    @Deprecated
    public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
    // Binary search algorithm, find the subscript corresponding to the current value of size in the size table (the subscript value in the array may be less than or greater than size, but must be the closest)
    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1; }}}// The inner class HandleImpl is initialized by the outer class attributes
    // This is an enhancement to MaxMessageHandle
    private final class HandleImpl extends MaxMessageHandle {
        private final int minIndex;
        private final int maxIndex;
        private int index;
        // Specifies the capacity of the next allocated byteBuf
        private int nextReceiveBufferSize;
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;
            // calculate the subscript of 2048 in SIZE_TABLE
            index = getSizeTableIndex(initial);
            NextReceiveBufferSize the initial value is 2048
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        @Override
        public void lastBytesRead(int bytes) {
            // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
            // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
            // the selector to check for more data. Going back to the selector can add significant latency for large
            // data transfers.
            // If the condition is true, the amount of data read is the same as the estimated amount of data. If the condition is true, the amount of data read is the same as the estimated amount of data read
            if (bytes == attemptedBytesRead()) {
                The record() method wants to update the nextReceiveBufferSize size
                // Because the previously evaluated volume is full, there may be a lot of data in the Channel that needs to be read, so it may be necessary to create a larger volume of ByteBuf
                // Parameter: the actual amount of data read this time
                record(bytes);
            }
            super.lastBytesRead(bytes);
        }

        @Override
        public int guess(a) {
            return nextReceiveBufferSize;
        }
        // Parameter: the actual amount of data read this time
        private void record(int actualReadBytes) {
            // For example:
            // Assume SIZE_TABLE[index] = 2048 -> SIZE_TABLE[index] = 1024
            // If the size of the second read is <= 1024, the channel buffer is not very large, so there is no need to create such a large ByteBuf.
            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
                // This will be false the first time
                // The second time I come to this will be true
                if (decreaseNow) {
                    // Subtract index from SIZE_TABLE[index-1] >= minimum
                    index = max(index - INDEX_DECREMENT, minIndex);
                    // Set the size of ByteBuf to be created next time - reduced as you can see
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    NextReceiveBufferSize is not actually updated until the upper branch is satisfied twice
                    decreaseNow = false;
                } else {
                    // Set this to true for the first time
                    decreaseNow = true; }}else if (actualReadBytes >= nextReceiveBufferSize) { // Increment logic - the actual amount of data read is greater than the estimated amount of data read - there may be a lot of data to read in the Channel
                // Increment the index, but perhaps ensure that SIZE_TABLE[index] <= maximum
                index = min(index + INDEX_INCREMENT, maxIndex);
                // // set the size of ByteBuf to be created next time - as you can see, it's increased
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false; // This is an error correction}}@Override
        public void readComplete(a) {
            // The argument here actually accumulates the amount of data to read in the loop
            // This is actually a scaling of ByteBuf size
            // Evaluate the amount of data on the Channel and set a larger ByteBuf size - so the next time a Channel has data coming in,
            // Set the large size directly to create ByteBuf for reading rowsrecord(totalBytesRead()); }}private final int minIndex;
    private final int maxIndex;
    private final int initial;

    /**
     * Creates a new predictor with the default parameters.  With the default
     * parameters, the expected buffer size starts from {@code 1024}, does not
     * go down below {@code 64}, and does not go up above {@code65536}. * /
    public AdaptiveRecvByteBufAllocator(a) {
        // Parameter 1:64
        // Parameter 2:2048
        // 3:65536
        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    /**
     * Creates a new predictor with the specified parameters.
     *
     * @param minimum  the inclusive lower bound of the expected buffer size
     * @param initial  the initial buffer size when no feed back was received
     * @param maximum  the inclusive upper bound of the expected buffer size
     */
    // Parameter 1:64
    // Parameter 2:2048
    // 3:65536
    // minIndex, maxIndex and initial are assigned
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        checkPositive(minimum, "minimum");
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        }
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        }
        // Binary search algorithm, find the subscript of the nearest minimum in the size table.
        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            // Ensure that SIZE_TABLE[minIndex] >= minimum
            this.minIndex = minIndex + 1;
        } else {
            // that is, the appropriate subscript maxIndex is found
            this.minIndex = minIndex;
        }

        // Binary search algorithm, find the subscript of the current value in the size table, the value of the subscript in the array may be less than or greater than maximum, but must be the closest value.
        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            // Make sure that SIZE_TABLE[maxIndex] <= maximum - make sure that you do not exceed the threshold
            this.maxIndex = maxIndex - 1;
        } else {
            // find the appropriate maxIndex
            this.maxIndex = maxIndex;
        }
        // Initial value: 2048
        this.initial = initial;
    }

    @SuppressWarnings("deprecation")
    // Create HandleImpl
    @Override
    public Handle newHandle(a) {
        / / this parameter is actually AdaptiveRecvByteBufAllocator structure to set the values
        // Argument one: 64 corresponds to a subscript in SIZE_TABLE
        // Argument 2:65536 corresponds to subscripts in SIZE_TABLE
        // Parameter 3:2048
        return new HandleImpl(minIndex, maxIndex, initial);
    }

    @Override
    public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
        super.respectMaybeMoreData(respectMaybeMoreData);
        return this; }}Copy the code

Here need to focus on newHandle () method, which is actually create HandleImpl instance, and HandlerImpl is for DefaultMaxMessagesRecvByteBufAllocator MaxMessageHandle extension, And MaxMessageHandle is for RecvByteBufAllocator. ExtendedHandle extensions, and ExtendedHandle itself is for RecvByteBufAllocator. The expansion of the Handle, you can see, It’s all connected, as you’ve seen before

In this class, there is an int array, SIZE_TABLE, which contains sizes of different sizes that will be used to estimate the Size of ByteBuf memory. Guess () will be used to get the Size of the next ByteBuf

In the static initial code block, we can see the values stored inside: Integer.MAX_VALUE = 512,1204; Integer. It’s actually integer.max_value at most

Obviously, at this point we are more concerned with the first guess() return value, which is nextReceiveBufferSize, which was initialized in the initial HandleImpl code block, i.enextReceiveBufferSize = SIZE_TABLE[index]

In the Netty version, 2048 is 1024 < 1, so the Size of ByteBuf created for the first time is 2048

ByteBuf Size is limited to minIndex and maxIndex. The values of DEFAULT_MINIMUN and DEFAULT_MAXIMUM are actually calculated based on the binary search algorithm in getSizeTableIndex(), and the actual value corresponding to the index may be greater or less than size, but it is certainly the closest

Next, there is a method record() in HandleImpl that I think best represents the elasticity of ByteBuf. This method compares the amount of data actually read to the last predicted ByteBuf Size. The dynamic implementation adjusts the ByteBuf Size to be created next time, in effect to improve system performance

At this point, RecvByteBufAllocator is read!

A slight complement to unsafe.read ()

We already know that from the last interpretation of message processingfinal RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle()The actual HandleImpl instance is created

NioByteUnsafe. Read ()!

        @Override
        public final void read(a) {
            // Get the client config
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            // Get the client pipeine
            final ChannelPipeline pipeline = pipeline();
            // Get the buffer allocator - As long as the platform is not Android, the obtained buffer allocator is the pooled memory management buffer allocator
            final ByteBufAllocator allocator = config.getAllocator();
            // RecvByteBufAllocator - Controls read loops and predicts the size of the next ByteBuf to be created
            // So the alloHandle here is actually the HandleImpl instance
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            / / reset...
            allocHandle.reset(config);
            // Enhanced interface implementation of JDk layer ByteBuffer
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    // Parameter: the buffer allocator for pooled memory management, which actually does the allocation
                    // allocHandle is just to predict how much memory to allocate
                    // Allocate memory for byteBuf - the first allocation will be 2048
                    byteBuf = allocHandle.allocate(allocator);

                    // doReadBytes(byteBuf) - Reads current socket buffer data into byteBuf - Returns the actual amount of data read from SocketChannel
                    // lastBytesRead() - Update the cache to predict how much data the allocator last read...
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                    // true:
                    // a.channel The underlying socket read buffer has been read, no data is available, return 0
                    // b.channel Returns -1 when the peer is closed
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
                    // Update the cache to predict the number of messages read by the allocator
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    // The ChannelRead() event is propagated to the client Channel pipeline, where interested handlers can do some business
                    pipeline.fireChannelRead(byteBuf); // The ChannelRead() event is broadcast on every read
                    byteBuf = null;
                } while (allocHandle.continueReading());

                // The message has been read

                allocHandle.readComplete(); // Call handleimp.readComplete ()
                // Propagate the ChannelReadComplete() event to the client Channel's pipeline
                The SelectionKey event of interest contains Read, which means that the corresponding selector needs to continue listening for readable events for the current channel
                pipeline.fireChannelReadComplete();

                if(close) { closeOnRead(pipeline); }}catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
Copy the code

In allochandle.reset (config), which is also the method called before entering the loop, what is happening here?

        // Reset the current handler
        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            // Resets the maximum amount of data that can be read in a read loop - 16 by default - for both servers and clients
            maxMessagePerRead = maxMessagesPerRead();
            // Reset the statistics field
            totalMessages = totalBytesRead = 0;
        }
Copy the code

This ensures that the Handle is independent each time it enters the read loop, but nextRecvBufferSize is not reset.

Then the reread loop creates ByteBuf, reads data from doReadBytes, and calls the lastBytesRead() method with the value returned by the method as the actual number of bytes read. What does that do?

        @Override
        public void lastBytesRead(int bytes) {
            // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
            // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
            // the selector to check for more data. Going back to the selector can add significant latency for large
            // data transfers.
            // If the condition is true, the amount of data read is the same as the estimated amount of data. If the condition is true, the amount of data read is the same as the estimated amount of data read
            if (bytes == attemptedBytesRead()) {
                The record() method wants to update the nextReceiveBufferSize size
                // Because the previously evaluated volume is full, there may be a lot of data in the Channel that needs to be read, so it may be necessary to create a larger volume of ByteBuf
                // Parameter: the actual amount of data read this time
                record(bytes);
            }
            super.lastBytesRead(bytes);
        }
Copy the code

Config () is called to set the size of the ByteBuf to be created next time, if there may be unread data in the Channel

After that, it’s passallocHandle.continueReading()Controls the read loop

After exiting the read loop, allochandle.readComplete () is called

        @Override
        public void readComplete(a) {
            // The argument here actually accumulates the amount of data to read in the loop
            // This is actually a scaling of ByteBuf size
            // Evaluate the amount of data on the Channel and set a larger ByteBuf size - so the next time a Channel has data coming in,
            // Set the large size directly to create ByteBuf for reading rows
            record(totalBytesRead());
        }
Copy the code

As you can see, record() is called, and the argument passed is actually the total number of bytes read in this read loop

In this case, the next time the client sends data, it will create a very large ByteBuf. Based on the amount of data received last time, it may realize the fast reading of data with fewer times of receiving. Essentially, this is also for the consideration of system performance