This is the fourth day of my participation in the August More text Challenge. For details, see:August is more challenging
First, the source code entry
In the last article we looked at the source code for several typical built-in decoders. In this lesson we will look at the data writing process during data communication.
Let’s take the code from the previous example:
Let’s go straight to the topic and analyze the following source code:
ctx.writeAndFlush(byteBuf);
Copy the code
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
Copy the code
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
Copy the code
Write (MSG, true, promise); The second argument is true. Why true? Since we are calling writeAndFlush, it is true, and false without flush!
private void write(Object msg, boolean flush, ChannelPromise promise) {... Ignore...// Find the corresponding handler based on the mask
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// Whether writeAndFlush is called
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
// Whether write is callednext.invokeWrite(m, promise); }}else{... Ignore... }}Copy the code
In order to facilitate the analysis, we will continue to conduct the analysis in a synchronous way:
final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
Copy the code
Start from the current node and look up to the node that implements write and flush. By default, HeadContext is implemented, so it must be HeadContext!
Second, source code analysis
We call writeAndflush, so flush is true, so it goes into the if logic:
next.invokeWriteAndFlush(m, promise);
Copy the code
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// Join the write queue
invokeWrite0(msg, promise);
// Flush the buffer
invokeFlush0();
} else{ writeAndFlush(msg, promise); }}Copy the code
InvokeHandler () generally returns true, so we enter invokeWrite0:
1. Append to the queue
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}Copy the code
Our previous analysis, the handler must be HeadContext, so we entered io.net ty. Channel. DefaultChannelPipeline. HeadContext# write:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
Copy the code
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; . Ignore...int size;
try {
// Convert bytebuf to out-of-heap memory
//io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0; }}catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// Insert data into the write queue
outboundBuffer.addMessage(msg, size, promise);
}
Copy the code
-
Netty uses out-of-heap memory by default, even if you create an in-heap memory Netty will forcibly convert it to out-of-heap memory (as follows):
msg = filterOutboundMessage(msg); Copy the code
@Override protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; // Check whether it is out of heap memory if (buf.isDirect()) { // Out-of-heap memory is returned without conversion return msg; } // Create an out-of-heap memory to write data to the out-of-heap memory returnnewDirectBuffer(buf); }... }Copy the code
-
Determine the number of readable bytes:
size = pipeline.estimatorHandle().size(msg); Copy the code
-
Data is appended to the write queue
// Insert data into the write queue outboundBuffer.addMessage(msg, size, promise); Copy the code
public void addMessage(Object msg, int size, ChannelPromise promise) { // Encapsulate BuytBuf as an Entity Entry entry = Entry.newInstance(msg, size, total(msg), promise); // When tailEntity is Null if (tailEntry == null) { // flushedEntry is initialized flushedEntry = null; } else { Entry tail = tailEntry; // Point the current tail node to the new node tail.next = entry; } // Assign the current new node as the tail node tailEntry = entry; if (unflushedEntry == null) { //unflushedEntry also refers to the new entry node unflushedEntry = entry; } // After the message is added to the unflushed array, add the bytes to be processed. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false); } Copy the code
First, append the node. This append is to refresh the node. Let’s see the illustration:
In fact, the data is not written to the channel at this stage. Instead, it is appended to a linked list designed by Netty, which is appended consistently as long as the flush method is not called.
-
TailEntry always points to the latest data, and unflushedEnrty always points to the head node!
-
Each time you append a node, that node is appended to a reference to the previous node next!
After each append, record the number of bytes that are currently to be refreshed:
incrementPendingOutboundBytes(entry.pendingSize, false); Copy the code
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } TOTAL_PENDING_SIZE_UPDATER long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); // Wait to write section > 64 * 1024 if(newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); }}Copy the code
This will determine whether the current delegate section exceeds 64 * 1024
private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0&& newValue ! =0) { / / spread ChannelWritabilityChanged invokeLater () event fireChannelWritabilityChanged(invokeLater); } break; }}}Copy the code
The main logic is when the number of bytes to write more than 64 k when people will spread a ChannelWritabilityChanged event!
-
2. Append to the refresh queue
We went back to the main task: io.net ty. Channel. AbstractChannelHandlerContext# invokeWriteAndFlush
invokeFlush0();
Copy the code
private void invokeFlush0(a) {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch(Throwable t) { notifyHandlerException(t); }}Copy the code
Same as above, the handler is also HeadContext here, we enter: io.net ty. Channel. DefaultChannelPipeline. HeadContext# flush:
@Override
public final void flush(a) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// Add to the flush buffer
outboundBuffer.addFlush();
// Start flushing to the pipe
flush0();
}
Copy the code
Two things are done here:
- Moves data from the write queue to the refresh queue
- Writes the data from the refresh queue to the Socket channel
I. Transfer the data from the write queue to the refresh queue
// Add to the flush buffer
outboundBuffer.addFlush();
Copy the code
public void addFlush(a) {
// Fetch the first node waiting to flush the queue
Entry entry = unflushedEntry;
if(entry ! =null) {
// When first added
if (flushedEntry == null) {
// There is no flushedEntry yet, so start with the entry
// Set the refresh queue equal to the current node to be refreshed
flushedEntry = entry;
}
do {
/ / since the increase
flushed ++;
if(! entry.promise.setUncancellable()) {// Cancelled, so make sure we free the memory and notify the freed bytes
int pending = entry.cancel();
// Reduce the number of bytes to be written.
decrementPendingOutboundBytes(pending, false.true);
}
// Continue to process the data
entry = entry.next;
} while(entry ! =null);
// All have been refreshed, so reset to not refreshed
unflushedEntry = null; }}Copy the code
This logic is not hard to start with. We previously wrote the queue structure as follows:
Let’s look at this logic:
Entry entry = unflushedEntry;
if(entry ! =null) {
// When first added
if (flushedEntry == null) {
// There is no flushedEntry yet, so start with the entry
// Set the refresh queue equal to the current node to be refreshed
flushedEntry = entry;
}
Copy the code
} flushedEntry = Null;} flushedEntry = entry;}
To start recycling the consumption tape to refresh queue:
do {
/ / since the increase
flushed ++;
if(! entry.promise.setUncancellable()) {// Cancelled, so make sure we free the memory and notify the freed bytes
int pending = entry.cancel();
// Reduce the number of bytes to be written.
decrementPendingOutboundBytes(pending, false.true);
}
// Continue to process the data
entry = entry.next;
} while(entry ! =null);
Copy the code
-
Here will continue to consumption, when the amount of data is less than 32 k, will also trigger ChannelWritabilityChanged
-
decrementPendingOutboundBytes(pending, false.true); Copy the code
-
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); // The number of nodes to be processed is smaller than 32 x 1024 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { //ChannelWritabilityChanged(invokeLater);setWritable(invokeLater); }}Copy the code
-
This source code can try with themselves, and the above basic consistent, one is + one is –
// All have been refreshed, so reset to not refreshed
unflushedEntry = null;
Copy the code
At this point, the structure diagram becomes:
The overall refresh queue is as follows:
II. Write data to the refresh queue
// Start flushing to the pipe
flush0();
Copy the code
@Override
protected final void flush0(a) {
if(! isFlushPending()) {super.flush0(); }}Copy the code
protected void flush0(a) {... Ignore...try {
// Start writing
doWrite(outboundBuffer);
} catch(Throwable t) { ................. Ignore... }}Copy the code
Here we focus on: doWrite(outboundBuffer);
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
// Get the header data of the current refresh queueObject msg = in.current(); .// Write continuously
// Start writing data to the JDK pipeline
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
Copy the code
Here we start the flush queue before the loop consumption, calling doWriteInternal to write:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceofByteBuf) { ByteBuf buf = (ByteBuf) msg; .// Start writing
final intlocalFlushedAmount = doWriteBytes(buf); .// The disk is written
} else if (msg instanceof FileRegion) {
...................................
}
return WRITE_STATUS_SNDBUF_FULL;
}
Copy the code
Let’s go to :doWriteBytes(buf); Methods:
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
// Return the readable bytes
final int expectedWrittenBytes = buf.readableBytes();
// Write buf to the pipe
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
Copy the code
Get the current number of bytes that can be read and then write to the channel through the channel object returned by javaChannel(). Thus an entity is written to the Socket channel!
Third, summary
- Write data to the write queue first
- Moves an Entity from the Write queue to the Flush queue
- Write the ByteBuf of the Entity in flush queue to the Socket Channel through the underlying JDK Channel!