This is the fourth day of my participation in the August More text Challenge. For details, see:August is more challenging

First, the source code entry

In the last article we looked at the source code for several typical built-in decoders. In this lesson we will look at the data writing process during data communication.

Let’s take the code from the previous example:

Let’s go straight to the topic and analyze the following source code:

ctx.writeAndFlush(byteBuf);
Copy the code

@Override
public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}
Copy the code
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    write(msg, true, promise);
    return promise;
}
Copy the code

Write (MSG, true, promise); The second argument is true. Why true? Since we are calling writeAndFlush, it is true, and false without flush!

private void write(Object msg, boolean flush, ChannelPromise promise) {... Ignore...// Find the corresponding handler based on the mask
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                                                                   (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // Whether writeAndFlush is called
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            // Whether write is callednext.invokeWrite(m, promise); }}else{... Ignore... }}Copy the code

In order to facilitate the analysis, we will continue to conduct the analysis in a synchronous way:

final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
Copy the code

Start from the current node and look up to the node that implements write and flush. By default, HeadContext is implemented, so it must be HeadContext!

Second, source code analysis

We call writeAndflush, so flush is true, so it goes into the if logic:

next.invokeWriteAndFlush(m, promise);
Copy the code
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        // Join the write queue
        invokeWrite0(msg, promise);
        // Flush the buffer
        invokeFlush0();
    } else{ writeAndFlush(msg, promise); }}Copy the code

InvokeHandler () generally returns true, so we enter invokeWrite0:

1. Append to the queue

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}Copy the code

Our previous analysis, the handler must be HeadContext, so we entered io.net ty. Channel. DefaultChannelPipeline. HeadContext# write:

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}
Copy the code

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; . Ignore...int size;
    try {
        // Convert bytebuf to out-of-heap memory
        //io.netty.channel.nio.AbstractNioByteChannel.filterOutboundMessage
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0; }}catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    // Insert data into the write queue
    outboundBuffer.addMessage(msg, size, promise);
}
Copy the code
  • Netty uses out-of-heap memory by default, even if you create an in-heap memory Netty will forcibly convert it to out-of-heap memory (as follows):

    msg = filterOutboundMessage(msg);
    Copy the code

    @Override
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            // Check whether it is out of heap memory
            if (buf.isDirect()) {
                // Out-of-heap memory is returned without conversion
                return msg;
            }
    		// Create an out-of-heap memory to write data to the out-of-heap memory
            returnnewDirectBuffer(buf); }... }Copy the code
  • Determine the number of readable bytes:

    size = pipeline.estimatorHandle().size(msg);
    Copy the code
  • Data is appended to the write queue

    // Insert data into the write queue
    outboundBuffer.addMessage(msg, size, promise);
    Copy the code
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        // Encapsulate BuytBuf as an Entity
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        // When tailEntity is Null
        if (tailEntry == null) {
            // flushedEntry is initialized
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            // Point the current tail node to the new node
            tail.next = entry;
        }
        // Assign the current new node as the tail node
        tailEntry = entry;
        if (unflushedEntry == null) {
            //unflushedEntry also refers to the new entry node
            unflushedEntry = entry;
        }
    
        // After the message is added to the unflushed array, add the bytes to be processed.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    Copy the code

    First, append the node. This append is to refresh the node. Let’s see the illustration:

    In fact, the data is not written to the channel at this stage. Instead, it is appended to a linked list designed by Netty, which is appended consistently as long as the flush method is not called.

    • TailEntry always points to the latest data, and unflushedEnrty always points to the head node!

    • Each time you append a node, that node is appended to a reference to the previous node next!

    After each append, record the number of bytes that are currently to be refreshed:

    incrementPendingOutboundBytes(entry.pendingSize, false);
    Copy the code
    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }
        TOTAL_PENDING_SIZE_UPDATER
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        // Wait to write section > 64 * 1024
        if(newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); }}Copy the code

    This will determine whether the current delegate section exceeds 64 * 1024

    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0&& newValue ! =0) {
                    / / spread ChannelWritabilityChanged invokeLater () event
                    fireChannelWritabilityChanged(invokeLater);
                }
                break; }}}Copy the code

    The main logic is when the number of bytes to write more than 64 k when people will spread a ChannelWritabilityChanged event!

2. Append to the refresh queue

We went back to the main task: io.net ty. Channel. AbstractChannelHandlerContext# invokeWriteAndFlush

invokeFlush0();
Copy the code
private void invokeFlush0(a) {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch(Throwable t) { notifyHandlerException(t); }}Copy the code

Same as above, the handler is also HeadContext here, we enter: io.net ty. Channel. DefaultChannelPipeline. HeadContext# flush:

@Override
public final void flush(a) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    // Add to the flush buffer
    outboundBuffer.addFlush();
    // Start flushing to the pipe
    flush0();
}
Copy the code

Two things are done here:

  1. Moves data from the write queue to the refresh queue
  2. Writes the data from the refresh queue to the Socket channel

I. Transfer the data from the write queue to the refresh queue

// Add to the flush buffer
outboundBuffer.addFlush();
Copy the code
public void addFlush(a) {
    // Fetch the first node waiting to flush the queue
    Entry entry = unflushedEntry;
    if(entry ! =null) {
        // When first added
        if (flushedEntry == null) {
            // There is no flushedEntry yet, so start with the entry
            // Set the refresh queue equal to the current node to be refreshed
            flushedEntry = entry;
        }
        do {
            / / since the increase
            flushed ++;
            if(! entry.promise.setUncancellable()) {// Cancelled, so make sure we free the memory and notify the freed bytes
                int pending = entry.cancel();
                // Reduce the number of bytes to be written.
                decrementPendingOutboundBytes(pending, false.true);
            }
            // Continue to process the data
            entry = entry.next;
        } while(entry ! =null);

        // All have been refreshed, so reset to not refreshed
        unflushedEntry = null; }}Copy the code

This logic is not hard to start with. We previously wrote the queue structure as follows:

Let’s look at this logic:

Entry entry = unflushedEntry;
if(entry ! =null) {
    // When first added
    if (flushedEntry == null) {
        // There is no flushedEntry yet, so start with the entry
        // Set the refresh queue equal to the current node to be refreshed
        flushedEntry = entry;
    }
Copy the code

} flushedEntry = Null;} flushedEntry = entry;}

To start recycling the consumption tape to refresh queue:

do {
    / / since the increase
    flushed ++;
    if(! entry.promise.setUncancellable()) {// Cancelled, so make sure we free the memory and notify the freed bytes
        int pending = entry.cancel();
        // Reduce the number of bytes to be written.
        decrementPendingOutboundBytes(pending, false.true);
    }
    // Continue to process the data
    entry = entry.next;
} while(entry ! =null);
Copy the code
  • Here will continue to consumption, when the amount of data is less than 32 k, will also trigger ChannelWritabilityChanged

  • decrementPendingOutboundBytes(pending, false.true);
    Copy the code
  • private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }
    
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        // The number of nodes to be processed is smaller than 32 x 1024
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            //ChannelWritabilityChanged(invokeLater);setWritable(invokeLater); }}Copy the code
  • This source code can try with themselves, and the above basic consistent, one is + one is –

// All have been refreshed, so reset to not refreshed
unflushedEntry = null;
Copy the code

At this point, the structure diagram becomes:

The overall refresh queue is as follows:

II. Write data to the refresh queue

// Start flushing to the pipe
flush0();
Copy the code

@Override
protected final void flush0(a) {
    if(! isFlushPending()) {super.flush0(); }}Copy the code
protected void flush0(a) {... Ignore...try {
        // Start writing
        doWrite(outboundBuffer);
    } catch(Throwable t) { ................. Ignore... }}Copy the code

Here we focus on: doWrite(outboundBuffer);

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();
    do {
        // Get the header data of the current refresh queueObject msg = in.current(); .// Write continuously
        // Start writing data to the JDK pipeline
        writeSpinCount -= doWriteInternal(in, msg);
    } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);
}
Copy the code

Here we start the flush queue before the loop consumption, calling doWriteInternal to write:

private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceofByteBuf) { ByteBuf buf = (ByteBuf) msg; .// Start writing
        final intlocalFlushedAmount = doWriteBytes(buf); .// The disk is written
    } else if (msg instanceof FileRegion) {
        ...................................
    }
    return WRITE_STATUS_SNDBUF_FULL;
}
Copy the code

Let’s go to :doWriteBytes(buf); Methods:

@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
    // Return the readable bytes
    final int expectedWrittenBytes = buf.readableBytes();
    // Write buf to the pipe
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
Copy the code

Get the current number of bytes that can be read and then write to the channel through the channel object returned by javaChannel(). Thus an entity is written to the Socket channel!

Third, summary

  1. Write data to the write queue first
  2. Moves an Entity from the Write queue to the Flush queue
  3. Write the ByteBuf of the Entity in flush queue to the Socket Channel through the underlying JDK Channel!