Interpret RecvBytebufAllocator
In our last reading of message processing, we noticed that there was always a RecvBytebufAllocator affecting our reading of the code logic
RecvByteBufAllocator
This is the top-level AdaptiveRecvByteBufAllocator interface, we come to analyzes one by one
public interface RecvByteBufAllocator {
/** * Creates a new handle. The handle provides the actual operations and keeps the internal information which is * required for predicting an optimal buffer capacity. */
// Create a handler that predicts the size of the next byteBuf object to be allocated
Handle newHandle(a);
/ * * *@deprecated Use {@link ExtendedHandle}.
*/
@Deprecated
interface Handle {
/** * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small * enough not to waste its space. */
// Allocate the interface of the ByteBuf buffer object, parameters are important! - The handler layer provides the size of the predicted allocation
// Argument: alloc - The object that actually allocates memory
ByteBuf allocate(ByteBufAllocator alloc);
/**
* Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
* capacity.
*/
// Get the predicted value of ByteBuf Size
int guess(a);
/**
* Reset any counters that have accumulated and recommend how many messages/bytes should be read for the next
* read loop.
* <p>
* This may be used by {@link #continueReading()} to determine if the read operation should complete.
* </p>
* This is only ever a hint and may be ignored by the implementation.
* @param config The channel configuration which may impact this object's behavior.
*/
void reset(ChannelConfig config);
/**
* Increment the number of messages that have been read for the current read loop.
* @param numMessages The amount to increment by.
*/
// Increases the number of read messages
void incMessagesRead(int numMessages);
/** * Set the bytes that have been read for the last read operation. * This may be used to increment the number of bytes that have been read. *@param bytes The number of bytes from the previous read operation. This may be negative if an read error
* occurs. If a negative value is seen it is expected to be return on the next call to
* {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
* to this class and is not required to be enforced in {@link #continueReading()}.
*/
// Set the Size of the last read from a Channel
void lastBytesRead(int bytes);
/**
* Get the amount of bytes for the previous read operation.
* @return The amount of bytes for the previous read operation.
*/
// Get the amount of data last read from Channel
int lastBytesRead(a);
/**
* Set how many bytes the read operation will (or did) attempt to read.
* @param bytes How many bytes the read operation will (or did) attempt to read.
*/
// Sets the amount of data that you want to read, or that has already been read
void attemptedBytesRead(int bytes);
/**
* Get how many bytes the read operation will (or did) attempt to read.
* @return How many bytes the read operation will (or did) attempt to read.
*/
// Get the amount of data that you want to read, or that has already been read
int attemptedBytesRead(a);
/**
* Determine if the current read loop should continue.
* @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
*/
/ / determine whether need to continue to read the data - corresponds to "read cycle" (NioMessageUnsafe. Read () | NioByteUnsafe. Read ())
boolean continueReading(a);
/** * The read has completed. */
// This read loop is complete
void readComplete(a); }...Copy the code
MaxMessageRecvByteBufAllocator
// A preliminary enhancement to RecvByteBufAllocator
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
/**
* Returns the maximum number of messages to read per read loop.
* a {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object) channelRead()} event.
* If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages.
*/
// Get the maximum number of messages that can be read per read loop - one pull per Channel is called a message
int maxMessagesPerRead(a);
/** * Sets the maximum number of messages to read per read loop. * If this value is greater than 1, an event loop might attempt to read multiple times to procure multiple messages. */
// Set the maximum number of messages that can be read per read loop
MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}
Copy the code
Then look at MaxMessageRecvByteBufAllocator implementation class
DefaultMaxMessageRecvByteBufAllocator
public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
// Indicates the maximum number of messages that can be read per read loop
private volatile int maxMessagesPerRead;
private volatile boolean respectMaybeMoreData = true; .// Enhancements to ExtendedHandle
// The outer class only provides arguments to the inner class. It does nothing else.
public abstract class MaxMessageHandle implements ExtendedHandle {
/ / config in the channel
private ChannelConfig config;
// Indicates the maximum amount of data that can be read in each read loop
private int maxMessagePerRead;
// Number of messages that have been read
private int totalMessages;
// Size Total size of messages that have been read
private int totalBytesRead;
// Estimate the number of bytes to read, or the amount of data that has been read
private int attemptedBytesRead;
// Represents the amount of data that was last read
private int lastBytesRead;
// true
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get(a) {
// Compares whether the estimated read data amount is the same as the last read data amount
// true - Indicates that the Channel may have data that has not been read and needs to continue reading
// false: a. Evaluate the amount of data generated ByteBuf > remaining data b.channel closed, lastBytesRead == -1
// false - indicates that no reading is required
returnattemptedBytesRead == lastBytesRead; }};/**
* Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
*/
// Reset the current handler
@Override
public void reset(ChannelConfig config) {
this.config = config;
// Resets the maximum amount of data that can be read in a read loop - 16 by default - for both servers and clients
maxMessagePerRead = maxMessagesPerRead();
// Reset the statistics field
totalMessages = totalBytesRead = 0;
}
// Argument: alloc - The allocator that actually allocates memory
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
// Guess () returns the predicted ByteBuf Size, which is an appropriate read Size based on the context of the read loop
// alloc. IoBuffer (size) - Creates a ByteBuf object based on the estimated size of the ByteBuf object
return alloc.ioBuffer(guess());
}
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
@Override
public void lastBytesRead(int bytes) {
// Records the data read last time
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes; // Update the total read data volume}}@Override
public final int lastBytesRead(a) {
return lastBytesRead;
}
// Core - This is also the outer layer that controls the read loop and core method
@Override
public boolean continueReading(a) {
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
// The read loop continues only when all four conditions are true
// Condition 1: config.isautoread () Defaults to true
/ / condition 2: maybeMoreDataSupplier. The get () to determine whether a last read data quantity is equal to the estimated amount of data to read in order to determine whether also need to read data
// totalMessages < maxMessagePerRead: unsafe.read() indicates that a channel cannot be read for more than 16 times
TotalBytesRead > 0:
// Client: This is usually true, as long as the read loop has read data, it will add the read data to totalBytesRead
If Integer.MAX_VALUE exceeds Integer.MAX_VALUE, the value will be false
// Server: unsafe.read() does not update totalBytesRead, so it will be false - for a server read loop, the loop will actually only run once
returnconfig.isAutoRead() && (! respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead >0;
}
@Override
public void readComplete(a) {}@Override
public int attemptedBytesRead(a) {
return attemptedBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
// Returns the total amount of data read
protected final int totalBytesRead(a) {
return totalBytesRead < 0? Integer.MAX_VALUE : totalBytesRead; }}}Copy the code
In fact, we’ve seen that the outer class is actually used to provide parameters to the inner class, or to initialize the inner class based on the outer class
In this class, we are obviously more concerned with the way continueReading(), which controls whether we continue to read the loop. To continue, four conditions need to be met at the same time. See the code for details
Then, the most emphasis came to our class AdaptiveRecvByteBufAllocator
AdaptiveRecvByteBufAllocator
/**
* The {@link RecvByteBufAllocator} that automatically increases and
* decreases the predicted buffer size on feed back.
* <p>
* It gradually increases the expected number of readable bytes if the previous
* read fully filled the allocated buffer. It gradually decreases the expected
* number of readable bytes if the read operation was not able to fill a certain
* amount of the allocated buffer two times consecutively. Otherwise, it keeps
* returning the same prediction.
*/
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
// Default minimum value
static final int DEFAULT_MINIMUM = 64;
// Use an initial value that is bigger than the common MTU of 1500
static final int DEFAULT_INITIAL = 2048; // Default initial value
static final int DEFAULT_MAXIMUM = 65536; // Default maximum value
// Index increment -4
private static final int INDEX_INCREMENT = 4;
// index decrement -1
private static final int INDEX_DECREMENT = 1;
// size table - Stores different sizes for guess() methods
// Guess () will take an appropriate size from the size table and then alloc() to allocate memory for ByteBuf
private static final int[] SIZE_TABLE;
// Initial code block
static {
List<Integer> sizeTable = new ArrayList<Integer>();
// Initialize sizeTable < 512, increment by 16
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
// Suppress a warning since i becomes negative when an integer overflow happens
// When the value is greater than 512, the value is doubled each time
Integer.MAX_VALUE = Integer
for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
sizeTable.add(i);
}
// Next, size_table is initialized
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); }}/ * * *@deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
*/
@Deprecated
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
// Binary search algorithm, find the subscript corresponding to the current value of size in the size table (the subscript value in the array may be less than or greater than size, but must be the closest)
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1; }}}// The inner class HandleImpl is initialized by the outer class attributes
// This is an enhancement to MaxMessageHandle
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private int index;
// Specifies the capacity of the next allocated byteBuf
private int nextReceiveBufferSize;
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
// calculate the subscript of 2048 in SIZE_TABLE
index = getSizeTableIndex(initial);
NextReceiveBufferSize the initial value is 2048
nextReceiveBufferSize = SIZE_TABLE[index];
}
@Override
public void lastBytesRead(int bytes) {
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
// If the condition is true, the amount of data read is the same as the estimated amount of data. If the condition is true, the amount of data read is the same as the estimated amount of data read
if (bytes == attemptedBytesRead()) {
The record() method wants to update the nextReceiveBufferSize size
// Because the previously evaluated volume is full, there may be a lot of data in the Channel that needs to be read, so it may be necessary to create a larger volume of ByteBuf
// Parameter: the actual amount of data read this time
record(bytes);
}
super.lastBytesRead(bytes);
}
@Override
public int guess(a) {
return nextReceiveBufferSize;
}
// Parameter: the actual amount of data read this time
private void record(int actualReadBytes) {
// For example:
// Assume SIZE_TABLE[index] = 2048 -> SIZE_TABLE[index] = 1024
// If the size of the second read is <= 1024, the channel buffer is not very large, so there is no need to create such a large ByteBuf.
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
// This will be false the first time
// The second time I come to this will be true
if (decreaseNow) {
// Subtract index from SIZE_TABLE[index-1] >= minimum
index = max(index - INDEX_DECREMENT, minIndex);
// Set the size of ByteBuf to be created next time - reduced as you can see
nextReceiveBufferSize = SIZE_TABLE[index];
NextReceiveBufferSize is not actually updated until the upper branch is satisfied twice
decreaseNow = false;
} else {
// Set this to true for the first time
decreaseNow = true; }}else if (actualReadBytes >= nextReceiveBufferSize) { // Increment logic - the actual amount of data read is greater than the estimated amount of data read - there may be a lot of data to read in the Channel
// Increment the index, but perhaps ensure that SIZE_TABLE[index] <= maximum
index = min(index + INDEX_INCREMENT, maxIndex);
// // set the size of ByteBuf to be created next time - as you can see, it's increased
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false; // This is an error correction}}@Override
public void readComplete(a) {
// The argument here actually accumulates the amount of data to read in the loop
// This is actually a scaling of ByteBuf size
// Evaluate the amount of data on the Channel and set a larger ByteBuf size - so the next time a Channel has data coming in,
// Set the large size directly to create ByteBuf for reading rowsrecord(totalBytesRead()); }}private final int minIndex;
private final int maxIndex;
private final int initial;
/**
* Creates a new predictor with the default parameters. With the default
* parameters, the expected buffer size starts from {@code 1024}, does not
* go down below {@code 64}, and does not go up above {@code65536}. * /
public AdaptiveRecvByteBufAllocator(a) {
// Parameter 1:64
// Parameter 2:2048
// 3:65536
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
/**
* Creates a new predictor with the specified parameters.
*
* @param minimum the inclusive lower bound of the expected buffer size
* @param initial the initial buffer size when no feed back was received
* @param maximum the inclusive upper bound of the expected buffer size
*/
// Parameter 1:64
// Parameter 2:2048
// 3:65536
// minIndex, maxIndex and initial are assigned
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
checkPositive(minimum, "minimum");
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
// Binary search algorithm, find the subscript of the nearest minimum in the size table.
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
// Ensure that SIZE_TABLE[minIndex] >= minimum
this.minIndex = minIndex + 1;
} else {
// that is, the appropriate subscript maxIndex is found
this.minIndex = minIndex;
}
// Binary search algorithm, find the subscript of the current value in the size table, the value of the subscript in the array may be less than or greater than maximum, but must be the closest value.
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
// Make sure that SIZE_TABLE[maxIndex] <= maximum - make sure that you do not exceed the threshold
this.maxIndex = maxIndex - 1;
} else {
// find the appropriate maxIndex
this.maxIndex = maxIndex;
}
// Initial value: 2048
this.initial = initial;
}
@SuppressWarnings("deprecation")
// Create HandleImpl
@Override
public Handle newHandle(a) {
/ / this parameter is actually AdaptiveRecvByteBufAllocator structure to set the values
// Argument one: 64 corresponds to a subscript in SIZE_TABLE
// Argument 2:65536 corresponds to subscripts in SIZE_TABLE
// Parameter 3:2048
return new HandleImpl(minIndex, maxIndex, initial);
}
@Override
public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this; }}Copy the code
Here need to focus on newHandle () method, which is actually create HandleImpl instance, and HandlerImpl is for DefaultMaxMessagesRecvByteBufAllocator MaxMessageHandle extension, And MaxMessageHandle is for RecvByteBufAllocator. ExtendedHandle extensions, and ExtendedHandle itself is for RecvByteBufAllocator. The expansion of the Handle, you can see, It’s all connected, as you’ve seen before
In this class, there is an int array, SIZE_TABLE, which contains sizes of different sizes that will be used to estimate the Size of ByteBuf memory. Guess () will be used to get the Size of the next ByteBuf
In the static initial code block, we can see the values stored inside: Integer.MAX_VALUE = 512,1204; Integer. It’s actually integer.max_value at most
Obviously, at this point we are more concerned with the first guess() return value, which is nextReceiveBufferSize, which was initialized in the initial HandleImpl code block, i.enextReceiveBufferSize = SIZE_TABLE[index]
In the Netty version, 2048 is 1024 < 1, so the Size of ByteBuf created for the first time is 2048
ByteBuf Size is limited to minIndex and maxIndex. The values of DEFAULT_MINIMUN and DEFAULT_MAXIMUM are actually calculated based on the binary search algorithm in getSizeTableIndex(), and the actual value corresponding to the index may be greater or less than size, but it is certainly the closest
Next, there is a method record() in HandleImpl that I think best represents the elasticity of ByteBuf. This method compares the amount of data actually read to the last predicted ByteBuf Size. The dynamic implementation adjusts the ByteBuf Size to be created next time, in effect to improve system performance
At this point, RecvByteBufAllocator is read!
A slight complement to unsafe.read ()
We already know that from the last interpretation of message processingfinal RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle()
The actual HandleImpl instance is created
NioByteUnsafe. Read ()!
@Override
public final void read(a) {
// Get the client config
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
// Get the client pipeine
final ChannelPipeline pipeline = pipeline();
// Get the buffer allocator - As long as the platform is not Android, the obtained buffer allocator is the pooled memory management buffer allocator
final ByteBufAllocator allocator = config.getAllocator();
// RecvByteBufAllocator - Controls read loops and predicts the size of the next ByteBuf to be created
// So the alloHandle here is actually the HandleImpl instance
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
/ / reset...
allocHandle.reset(config);
// Enhanced interface implementation of JDk layer ByteBuffer
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// Parameter: the buffer allocator for pooled memory management, which actually does the allocation
// allocHandle is just to predict how much memory to allocate
// Allocate memory for byteBuf - the first allocation will be 2048
byteBuf = allocHandle.allocate(allocator);
// doReadBytes(byteBuf) - Reads current socket buffer data into byteBuf - Returns the actual amount of data read from SocketChannel
// lastBytesRead() - Update the cache to predict how much data the allocator last read...
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// true:
// a.channel The underlying socket read buffer has been read, no data is available, return 0
// b.channel Returns -1 when the peer is closed
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// Update the cache to predict the number of messages read by the allocator
allocHandle.incMessagesRead(1);
readPending = false;
// The ChannelRead() event is propagated to the client Channel pipeline, where interested handlers can do some business
pipeline.fireChannelRead(byteBuf); // The ChannelRead() event is broadcast on every read
byteBuf = null;
} while (allocHandle.continueReading());
// The message has been read
allocHandle.readComplete(); // Call handleimp.readComplete ()
// Propagate the ChannelReadComplete() event to the client Channel's pipeline
The SelectionKey event of interest contains Read, which means that the corresponding selector needs to continue listening for readable events for the current channel
pipeline.fireChannelReadComplete();
if(close) { closeOnRead(pipeline); }}catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
Copy the code
In allochandle.reset (config), which is also the method called before entering the loop, what is happening here?
// Reset the current handler
@Override
public void reset(ChannelConfig config) {
this.config = config;
// Resets the maximum amount of data that can be read in a read loop - 16 by default - for both servers and clients
maxMessagePerRead = maxMessagesPerRead();
// Reset the statistics field
totalMessages = totalBytesRead = 0;
}
Copy the code
This ensures that the Handle is independent each time it enters the read loop, but nextRecvBufferSize is not reset.
Then the reread loop creates ByteBuf, reads data from doReadBytes, and calls the lastBytesRead() method with the value returned by the method as the actual number of bytes read. What does that do?
@Override
public void lastBytesRead(int bytes) {
// If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
// This helps adjust more quickly when large amounts of data is pending and can avoid going back to
// the selector to check for more data. Going back to the selector can add significant latency for large
// data transfers.
// If the condition is true, the amount of data read is the same as the estimated amount of data. If the condition is true, the amount of data read is the same as the estimated amount of data read
if (bytes == attemptedBytesRead()) {
The record() method wants to update the nextReceiveBufferSize size
// Because the previously evaluated volume is full, there may be a lot of data in the Channel that needs to be read, so it may be necessary to create a larger volume of ByteBuf
// Parameter: the actual amount of data read this time
record(bytes);
}
super.lastBytesRead(bytes);
}
Copy the code
Config () is called to set the size of the ByteBuf to be created next time, if there may be unread data in the Channel
After that, it’s passallocHandle.continueReading()
Controls the read loop
After exiting the read loop, allochandle.readComplete () is called
@Override
public void readComplete(a) {
// The argument here actually accumulates the amount of data to read in the loop
// This is actually a scaling of ByteBuf size
// Evaluate the amount of data on the Channel and set a larger ByteBuf size - so the next time a Channel has data coming in,
// Set the large size directly to create ByteBuf for reading rows
record(totalBytesRead());
}
Copy the code
As you can see, record() is called, and the argument passed is actually the total number of bytes read in this read loop
In this case, the next time the client sends data, it will create a very large ByteBuf. Based on the amount of data received last time, it may realize the fast reading of data with fewer times of receiving. Essentially, this is also for the consideration of system performance