preface
I have written source Code Analysis of the Whole Process of Netty Server Startup. After BossGroup obtains the client connection to SocketChannel, it registers it with WorkerGroup, which drives data I/O reads and writes. The WorkerGroup EventLoop calls channel.safe.read () when it hears an OP_READ event in a Channel, and Netty wraps the read as ByteBuf. The callback pipeline.FireChannelRead (byteBuf) is then triggered to propagate the event.
The overall process is clear, but the details of receiving data are not covered. This article will do so.
Front knowledge
Those familiar with Java Nio programming should know that to read data from SocketChannel, you create a ByteBuffer and call the socketChannel.read (ByteBuffer) method. However, since the read operation does not actually take place, the program does not know how much data needs to be received, so we do not know how large a ByteBuffer needs to be created. If a large buffer is large, memory will be wasted; if a small buffer is small, it needs to be expanded frequently. In addition, ByteBuffer itself does not support capacity expansion, so you need to apply for a larger ByteBuffer and then copy the memory, which is more expensive. How does Netty solve this problem? That will be explained later.
There is another point that readers need to know in advance. For a Channel registered with the Selector multiplexer that listens for an OP_READ event, the Selector determines the number of bytes available for the Channel to read. This means that for a Channel with data to read, if you do not read the data, the select() multiplexer will return it again the next time. By default, Netty does not know how much data the peer end will send to you. By default, Netty does not continue to read data at most 16 times, because Netty is afraid of blocking other I/O events.
AbstractNioByteChannel. Read () analysis
Before the article has analyzed, when the Channel was detected in Netty readable events, will be called AbstractNioByteChannel. Read () method, the following is the method is a source of whole analysis:
/* Triggered when the client sends data. See io.net ty. Channel. Nio. NioEventLoop. ProcessSelectedKey * /
@Override
public final void read(a) {
// Client Channel configuration
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// Obtain ByteBufAllocator. The default value is PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
// Reset statistics
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
/* When the peer sends a large packet, TCP unpacks it. The OP_READ event will only fire once. Netty requires a loop of 16 reads by default, so ChannelRead() may fire multiple times and get half a packet. It doesn't matter if you don't finish the data 16 times, the next select() will continue processing. For a Selector readable event, if you don't finish reading the data, it keeps returning. * /
do {
// Allocate a ByteBuf large enough to hold readable data without wasting too much space.
byteBuf = allocHandle.allocate(allocator);
/* doReadBytes(byteBuf): byteBuf contains ByteBuffer, The underlying or call the SocketChannel. Read (ByteBuffer) allocHandle. LastBytesRead (), according to the actual number of bytes read, adaptive adjustment of allocation of buffer size next time. * /
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// No bytes were read
byteBuf.release();// Free ByteBuf, empty, meaningless.
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// Increments the number of messages read
allocHandle.incMessagesRead(1);
readPending = false;
// Propagate ChannelRead events via pipeline
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());// Determine whether to continue reading
// After reading, pipeline propagates ChannelReadComplete
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if(close) { closeOnRead(pipeline); }}catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
Copy the code
Here we focus on the recvBufAllocHandle() method, which simply nullates the recvHandle of the Channel binding and creates one if there is no binding.
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle(a) {
if (recvHandle == null) {
// If the recvHandle corresponding to the Channel is empty, a new instance is created
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
Copy the code
Channel will depend on the RecvByteBufAllocator. Handle to create ByteBuf, why not just create? As mentioned earlier, you don’t know how much data you really want to receive, and you don’t know how big a ByteBuf you should create, which wastes space, and you have to constantly expand it. For this reason, the Channel will give RecvByteBufAllocator create ByteBuf task. Handle processing, hope it can make a statistical analysis based on historical data, allocate a capacity is large enough to accommodate all of the data, and will not waste too much space.
RecvByteBufAllocator. Handle the details of the back analysis, here to finish read () process analysis.
After creating an appropriately sized ByteBuf, the Channel calls doReadBytes(ByteBuf) to write data to ByteBuf:
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// Set the number of bytes to try to read, as full as possible ByteBuf
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// Write the data read by SocketChannel to byteBuf
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
Copy the code
ByteBuf. WriteBytes (ScatteringByteChannel in, int Length) converts byteBuf to the JDK’s ByteBuffer and reads it through the JDK’s native SocketChannel. It returns the actual number of bytes read.
If the actual number of bytes to read less than or equal to zero, shows that there is no data to read, this OP_READ event handling, trigger pipeline. FireChannelReadComplete () callback, Otherwise, pipelines.FireChannelRead (byteBuf) is triggered to pass the read byteBuf to another ChannelHandler.
If the packet sent by the peer is too big, it’s likely that the ByteBuf created can’t read all of the data at once, so the Channel will do a loop reading, and the whole reading logic will be put in a while loop, Through allocHandle. ContinueReading () determine whether need to continue to read the data. ChannelHandler’s channelRead() will be triggered multiple times if the ByteBuf created is too small, even if TCP is not unpacked.
RecvByteBufAllocator analysis
RecvByteBufAllocator is Netty’s data receive buffer allocator. A Channel relies on this locator to create a ByteBuf of an appropriate size to improve performance and save memory.
RecvByteBufAllocator is a simple interface. Its work is completed by the internal interface Handle. Therefore, you can directly look at the Handle interface.
interface Handle {
/* Allocate an appropriately sized ByteBuf by ByteBufAllocator. Too big: Waste of space. Too small: Frequent capacity expansion causes memory replication overhead. * /
ByteBuf allocate(ByteBufAllocator alloc);
// Guess the number of bytes to allocate
int guess(a);
// Resets any counters that have been accumulated and suggests how many message bytes should be read for the next read cycle.
void reset(ChannelConfig config);
// Increase the number of read messages
void incMessagesRead(int numMessages);
/ / set the last of the number of bytes read, AdaptiveRecvByteBufAllocator will according to the value of adaptive adjustment of allocation of buffer size next time.
void lastBytesRead(int bytes);
// Get the number of bytes last read
int lastBytesRead(a);
// Sets the number of bytes to try to read
void attemptedBytesRead(int bytes);
// Get the number of bytes to try to read
int attemptedBytesRead(a);
// Whether to continue reading
boolean continueReading(a);
// The reading is complete
void readComplete(a);
}
Copy the code
Default RecvByteBufAllocator implementation is AdaptiveRecvByteBufAllocator, it can adjust the distribution of the adaptive ByteBuf size, we focus on analysis.
AdaptiveRecvByteBufAllocator class diagram is as follows:
From the top downwards see, MaxMessagesRecvByteBufAllocator is very simple, in the top interface, on the basis of the number of limit cycle read:
/* Netty reads in a loop when there is a readable event, and continueReading() can be used to determine whether to continueReading. This class is used to limit the number of loop reads. The default is 16. */
public interface MaxMessagesRecvByteBufAllocator extends RecvByteBufAllocator {
// Returns the maximum number of loop reads
int maxMessagesPerRead(a);
// Set the maximum number of loop reads
MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead);
}
Copy the code
DefaultMaxMessagesRecvByteBufAllocator is the default implementation MaxMessagesRecvByteBufAllocator, look at the properties:
// Maximum number of read messages
private volatile int maxMessagesPerRead;
/* Do you respect/care that there is more data to read? If true, there is no doubt that there is still data to read until the next loop reads 0 bytes. This may result in one more invalid read and the pointless creation of a ByteBuf. * /
private volatile boolean respectMaybeMoreData = true;
Copy the code
The core logic is in the Handle, so you can look at the Handle directly:
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
// The maximum number of times to read the message, default 16 times, not read, next time select continue reading.
private int maxMessagePerRead;
// Total number of messages read
private int totalMessages;
// Total number of bytes read
private int totalBytesRead;
// The number of bytes tried to read. The default is the number of writable bytes of ByteBuf, i.e., fill up ByteBuf as much as possible.
private int attemptedBytesRead;
// The number of bytes read last time, which adjusts the buffer size for the next allocation.
private int lastBytesRead;
private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
/* Default judgment of whether there is more data to read :attemptedBytesRead == lastBytesRead. If ByteBuf is filled, there may be more data to read. Otherwise, it stops reading and fires ChannelReadComplete(). * /
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get(a) {
returnattemptedBytesRead == lastBytesRead; }};// Resets the data according to config, fired each time a new Read event is processed.
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
// Assign a ByteBuf based on the number of bytes guessed.
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
// Increments the number of read messages to prevent blocking of the I/O thread and other events from being processed.
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
// Add up the total number of bytes read according to the number of bytes read last time
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) { totalBytesRead += bytes; }}@Override
public final int lastBytesRead(a) {
return lastBytesRead;
}
// Whether to continue reading the message loop
@Override
public boolean continueReading(a) {
/* The number of read messages does not reach the upper limit */
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
returnconfig.isAutoRead() && (! respectMaybeMoreData || maybeMoreDataSupplier.get())// Think there is still readable data
&&
totalMessages < maxMessagePerRead && totalBytesRead > 0;// The number of messages read did not reach the upper limit
}
@Override
public void readComplete(a) {}@Override
public int attemptedBytesRead(a) {
return attemptedBytesRead;
}
// Sets the number of bytes tried to read. Default is the number of writable bytes ByteBuf
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
protected final int totalBytesRead(a) {
return totalBytesRead < 0? Integer.MAX_VALUE : totalBytesRead; }}Copy the code
MaxMessageHandle determines whether to continue the loop based on whether the number of bytes read each time fills ByteBuf. If the Channel is full, it may have data waiting to be read. If the Channel is full, it may have no data waiting to be read.
Then the parent class, then see the core AdaptiveRecvByteBufAllocator, source code is not very long:
/* An adaptive, ByteBuf allocator that receives data from the peer and allocates ByteBuf with appropriate initial capacity. Avoid too small to cause frequent expansion, too large to cause memory waste, GC pressure. Prediction based on historical data collection: 1. If the data received last time is fully read ByteBuf, the buffer will be enlarged next time. 2. If the received data is smaller than the specified value for two consecutive times, the buffer allocated next time is reduced. * /
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
// Default minimum value
static final int DEFAULT_MINIMUM = 64;
// Default initial trial value
static final int DEFAULT_INITIAL = 2048;
// Default maximum value
static final int DEFAULT_MAXIMUM = 65536;
/* If you need to increase the size of the next allocated buffer, this is the expanded index step. * /
private static final int INDEX_INCREMENT = 4;
/* If you need to shrink the next allocated buffer size, this is the index step to shrink. * /
private static final int INDEX_DECREMENT = 1;
/ / capacity table
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
// Within 512 bytes, incrementing in 16-byte steps
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
// After 512 bytes, double capacity
for (int i = 512; i > 0; i <<= 1) { // lgtm[java/constant-comparison]
sizeTable.add(i);
}
// List to array
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); }}/ * * *@deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
*/
@Deprecated
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
/* Find subscripts by the given size, binary search method. If size is not in SIZE_TABLE, return the index */ that is closest to its smaller value
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1; }}}private final class HandleImpl extends MaxMessageHandle {
// The smallest index
private final int minIndex;
// Maximum index
private final int maxIndex;
// Index of the default capacity
private int index;
// Next accepted buffer size
private int nextReceiveBufferSize;
// Do I need to reduce capacity immediately? Because the number of bytes that need to be read twice is smaller than the threshold, set this parameter to true for the first time and reduce the capacity for the second time.
private boolean decreaseNow;
HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
// Index to SIZE_TABLE based on the initial value
index = getSizeTableIndex(initial);
// The initial buffer size is the default value
nextReceiveBufferSize = SIZE_TABLE[index];
}
/* unsafe.read () this method is called when loops read data. Bytes is the number of bytes actually read last time. * /
@Override
public void lastBytesRead(int bytes) {
/* attemptedBytesRead(): Number of bytes attempted to read. NioSocketChannel. DoReadBytes () sets the attemptedBytesRead to ByteBuf. WritableBytes (), namely, as long as there is data to be read, Netty will try to write ByteBuf full. * /
if (bytes == attemptedBytesRead()) {
// If the number of bytes actually read fills the buffer, expand the buffer.
record(bytes);
}
// Call the superclass method to add the value to totalBytesRead
super.lastBytesRead(bytes);
}
/* Predict the size of the buffer to be allocated next time */
@Override
public int guess(a) {
return nextReceiveBufferSize;
}
/* Adaptively adjusts the buffer size that should be allocated next time based on the number of bytes actually read */
private void record(int actualReadBytes) {
// If the number of bytes read is less than or equal to the previous capacity twice in a row
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
// Shrink according to the index step of -1, need to trigger two consecutive times to shrink, so there is decreaseNow
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);// Make sure it is not less than the minimum value
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true; }}else if (actualReadBytes >= nextReceiveBufferSize) {
/* Expand by index step of +4, but cannot exceed the maximum. Therefore, the default expansion logic is 2048 > 32768 > 65536 > 65536(unchanged...) 2k > 32K > 64K ... * /
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false; }}@Override
public void readComplete(a) {
// After the data is read, the buffer size should be adjusted adaptively according to the total number of bytes read this timerecord(totalBytesRead()); }}// Minimum, default, maximum capacity subscripts in SIZE_TABLE
private final int minIndex;
private final int maxIndex;
private final int initial;
// Create an adaptive accept buffer allocator with the default values :64, 2048, 65535
public AdaptiveRecvByteBufAllocator(a) {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
// Create an adaptive accept buffer allocator based on the specified minimum, default, and maximum capacity
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
checkPositive(minimum, "minimum");
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
@SuppressWarnings("deprecation")
@Override
public Handle newHandle(a) {
return new HandleImpl(minIndex, maxIndex, initial);
}
@Override
public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
super.respectMaybeMoreData(respectMaybeMoreData);
return this; }}Copy the code
AdaptiveRecvByteBufAllocator according to name can see, it is an adaptive data receive buffer distributor, the “adaptive” embodied in ByteBuf space allocation.
It uses three attributes to define the default value, minimum value, and maximum value of the ByteBuf allocation respectively. It uses an int array SIZE_TABLE to define the capacity table that is expanded or shrunk by 16 bytes in steps within 512 bytes, and multiplied after 512 bytes. LastBytesRead () records the actual number of bytes read in each loop. If the number of bytes read is full of ByteBuf, record(bytes) is called for expansion. The expansion strategy is as follows: Expand index +4, that is, 16 times expansion.
2048 > 32768 > 65536 > 65536 2k > 32K > 64K ...Copy the code
If the number of bytes read for two consecutive times is less than or equal to the previous capacity reduction, the system shrinks the capacity. The capacity reduction policy is capacity index bit -1, that is, the capacity of 512 bytes is doubled, and the capacity of 512 bytes is reduced by 16 bytes.
Since it requires two consecutive reads of fewer bytes to shrink, an attribute decreaseNow is added to record whether it needs to shrink immediately. It is true the first time and shrinks the second time.
Why two in a row? Because the number of bytes read at a time is small, it may be because the end of the previous packet is read, and the packet itself is still too large to shrink. Two consecutive times indicate that a complete packet is small, and the next allocation of ByteBuf can be smaller to save memory.
At the end of the loop, allochandle.readComplete () is called, which dynamically adjusts based on the total number of bytes read this time to provide a prediction for the next allocation of ByteBuf. For example, if the OP_READ event is read three times and 100KB is read each time, the next time a 300KB ByteBuf is allocated to read it all at once. Conversely, if the number of bytes read is small, shrink to save memory.
conclusion
When receiving data from the peer end, a Channel does not know how large a ByteBuf should be allocated for receiving data. Therefore, it assigns the task of allocating ByteBuf to RecvByteBufAllocator, expecting that it can allocate a ByteBuf with a large capacity that can hold data but a small capacity that does not waste much memory. The default implementation is AdaptiveRecvByteBufAllocator, it will be according to the actual number of bytes to read. In front of the adaptive adjustment of next ByteBuf size distribution.
Although AdaptiveRecvByteBufAllocator would try to predict the size of the allocation ByteBuf next time, but to predict will have no time, so the Channel cycle will read, prevent ByteBuf distribution too small cannot accommodate all the data. However, in order to prevent the I/O thread from blocking and the events of other channels not being processed, Netty by default sets a maximum of 16 cycles of reading at a time. If the sent packet is very large and has not been read for 16 times, Netty will give up processing this time and wait for the next select() poll.
ChannelRead () is triggered by TCP unpacking.