The previous article analyzed the source code for RecvByteBufAllocator and showed Netty how to receive data sent from the peer end. , and how the Netty is AdaptiveRecvByteBufAllocator from ByteBuf for learning to adapt to dynamic allocation, solve the Java ByteBuffer allocate memory too much waste, allocation is too small and need frequent problems to solve this problem.

This article looks at how Netty sends data out.

Front knowledge

The first thing you need to know is that Netty supports only two data types for sending data: ByteBuf and FileRegion. The former can be thought of as a ByteBuffer, a normal byte data transfer. The latter is file transfer. Netty uses FileRegion to achieve zero copy file transfer.

Write and Flush write() do not send data, but simply store it temporarily to the ChannelOutboundBuffer. Flush () actually transmits data over the Socket to the peer end. WriteAndFlush () simply executes the above two methods.

When a program writes a large amount of data, or flush() is called, the TCP buffer is filled because the peer end cannot receive the data, or because of network problems, a large number of messages are stored in the ChannelOutboundBuffer, causing the memory to overflow. In order to protect your program, Netty to set up a “high and low water level” Channel, when the backlog of news more than the high water level, Netty Channel will be set to “write” status and trigger channelWritabilityChanged callback, You can use channel.iswritable () to determine whether to continue writing data. Through ChannelConfig. SetWriteBufferHighWaterMark () and ChannelConfig setWriteBufferLowWaterMark () sets the high and low water level of the Channel.

Subscribe to OP_WRITE events Why subscribe to Channel OP_WRITE events when the write() operation is initiated by the user? Because the TCP buffer might be full, you should subscribe to the OP_WRITE event, give up writing for a while, wait for Selector to tell you that the Channel is writable, and then continue writing.

Java native SocketChannel only supports writing to ByteBuffer. When you write to ByteBuf via Netty, it converts ByteBuf to ByteBuffer. Method is ByteBuf internalNioBuffer ().

When you know the memory layout of Java objects in THE JVM write(MSG), MSG will be wrapped as an Entry node and added to the end of the chain. One of the attributes pendingSize records the memory space occupied by messages. This space size not only contains the space occupied by MSG data itself, but also includes the space occupied by Entry objects. So by default, it’s plus 96. The first thing you should know about an object is that its header takes up a maximum of 16 bytes, its reference takes up a minimum of 4 bytes and a maximum of 8 bytes, a long takes up 8 bytes, an int takes up 4 bytes, and a Boolean takes up 1 byte. In addition, the JVM requires Java objects to occupy an integer multiple of 8 bytes, so there are padding bytes.

ChannelHandlerContext. WriteAndFlush () analysis

Here is a simple example of sending ByteBuf and FileRegion, respectively:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // Send a hello
    ctx.writeAndFlush(Unpooled.wrappedBuffer("hello".getBytes()));
}
Copy the code
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // Transfer an A.txt file
    RandomAccessFile accessFile = new RandomAccessFile("/disk/a.txt"."r");
    DefaultFileRegion region = new DefaultFileRegion(accessFile.getChannel(), 0, accessFile.length());
	ctx.writeAndFlush(region);
}
Copy the code

I’ll start with the overall writeAndFlush process, with the actual sending details explained in the next section.

If ctx.channel().writeAndFlush() is called, the write event Handler will be called. The Pipeline TailContext is used to find a Handler that can handle the write event, with the event traveling a slightly different path. By default, HeadContext is found for processing, source code is as follows:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // Make sure the message sent is not empty
    ObjectUtil.checkNotNull(msg, "msg");
    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return; }}catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }

    // Find a Channel that can handle write events. By default, find HeadContext.
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();

    // If it is an EventLoop thread, execute it directly, otherwise submit a task for serial execution.
    if (executor.inEventLoop()) {
        if (flush) {
            / / is called writeAndFlush (), all flush to true, here is called HeadContext. InvokeWriteAndFlush ()
            next.invokeWriteAndFlush(m, promise);
        } else{ next.invokeWrite(m, promise); }}else {
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}
Copy the code

Call the invokeWriteAndFlush() method of HeadContext, which calls write and Flush in one method:

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        // Call write() with handler
        invokeWrite0(msg, promise);
        // Call flush() with handler
        invokeFlush0();
    } else{ writeAndFlush(msg, promise); }}Copy the code

Looking first at invokeWrite0(), which calls headContext.write (). Since the write operation needs to interact with the JDK’s underlying API, the operation is handed over to channel.unsafe:

 @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    // It needs to interact with the underlying JDK API, handed over to Unsafe.
    unsafe.write(msg, promise);
}
Copy the code

Then call AbstractChannel. AbstractUnsafe. Write () method, it is first to send data filtering, only support ByteBuf and FileRegion two types. The amount of memory consumed by the sent data is then calculated, since the Channel is set to “unwritable” once the backlogged messages exceed the Channel’s high watermark, preventing memory overflow. After these two steps, the message is added to the output buffer ChannelOutboundBuffer.

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {// outboundBuffer is created along with the Channel, usually not null.
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            safeSetFailure(promise,
                    newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
        }
        return;
    }

    int size;
    try {
        // Filter write messages to make sure they are ByteBuf or FileRegion, other objects do not support write.
        msg = filterOutboundMessage(msg);
        Since write() does not write messages to sockets, they are stored in memory until flush(). To prevent message accumulation, Netty sets the high and low watermark. When the total number of temporary messages reaches the maximum watermark, the Channel is set to the unwritable state to protect your program from memory overflow. . See: io.net ty. Channel. DefaultMessageSizeEstimator HandleImpl. The size () for FileRegion, will directly return to zero, because use the zero copy technique, don't need to read the documents to the JVM process. * /
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0; }}catch (Throwable t) {
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            safeSetFailure(promise, t);
        }
        return;
    }

    // Write () will only temporarily store the message in the outboundBuffer, but will not actually send it.
    outboundBuffer.addMessage(msg, size, promise);
}
Copy the code

Take a look at filterOutboundMessage(), which, in addition to filtering messages, attempts to convert HeapByteBuf to DirectByteBuf. To improve the data sending efficiency, Netty uses the direct memory for data read and written to sockets to avoid memory copying.

// Filter outbound messages. Only ByteBuf and FileRegion are supported.
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        // To avoid memory replication, the Socket uses out-of-heap memory for data read and written directly
        return newDirectBuffer(buf);
    }

    // File transfer
    if (msg instanceof FileRegion) {
        return msg;
    }
    // Throw an exception for unsupported data types
    throw new UnsupportedOperationException(
        "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
Copy the code

NewDirectBuffer () is no guarantee of success, if you use ByteBufAllocator is not pooling, and there is no open io.net ty. ThreadLocalDirectBufferSize, This means that Netty needs to apply for a DirectByteBuf that is not pooled. This operation is very expensive and Netty will forgo the conversion:

// Try to convert HeapByteBuf to DirectByteBuf if the conversion cost is too high.
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
    final int readableBytes = buf.readableBytes();
    if (readableBytes == 0) {
        // The number of bytes readable is 0, and the shared empty object is released directly and returned.
        ReferenceCountUtil.safeRelease(buf);
        return Unpooled.EMPTY_BUFFER;
    }

    // Get the ByteBufAllocator bound to the Channel
    final ByteBufAllocator alloc = alloc();
    if (alloc.isDirectBufferPooled()) {// Is the allocator pooled and able to allocate direct memory?
        // Create a direct memory ByteBuf of the specified size, write the data, and free the original buf
        ByteBuf directBuf = alloc.directBuffer(readableBytes);
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
        ReferenceCountUtil.safeRelease(buf);
        return directBuf;
    }

    / * if set io.net ty threadLocalDirectBufferSize, Netty will in FastThreadLocal thread through the Stack to achieve a lightweight ByteBuf object pool, after ByteBuf write to the Socket, It's automatically released, and it's pushed back into the thread-bound Stack for reuse. * /
    final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
    if(directBuf ! =null) {
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
        ReferenceCountUtil.safeRelease(buf);
        return directBuf;
    }

    // ByteBuf is a very expensive way to allocate unpooled direct memory. It has been tested that ByteBuf is more than 10 times slower than heap memory.
    return buf;
}
Copy the code

If set io.net ty threadLocalDirectBufferSize, Netty will create a specified number of ByteBuf for each thread object caching, these ByteBuf can be reused. Netty will store a Stack in FastThreadLocal, pop() one when needed and push() it back when used.

The MessageSizeEstimator is responsible for calculating the memory footprint of the data to be sent. The logic is simple, and returns 0 for FileRegion because FileRegion transfers files using zero-copy technology, using mMAP memory mapping directly. Without the need for the file is loaded into the JVM process, implement directly see io.net ty. Channel. DefaultMessageSizeEstimator. HandleImpl. The size () :

// The logic is simple to estimate the memory footprint of the message.
@Override
public int size(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    // FileRegion implements zero copy and does not need to load files into the JVM, so the memory footprint is zero and does not affect the Channel water mark.
    if (msg instanceof FileRegion) {
        return 0;
    }
    return unknownSize;
}
Copy the code

The ChannelOutboundBuffer code will be covered in more detail in the next section, except that write() will only temporarily store data to the ChannelOutboundBuffer, not actually send it.

The message is saved to the ChannelOutboundBuffer, and the write operation is complete. Then will call invokeFlush0 (), it would still be passed on to the Unsafe, call AbstractChannel. AbstractUnsafe. Flush (). It does two things: first, it flags as flushed entries that are to be sent in the ChannelOutboundBuffer, and then converts the outgoing Entry data to Java’s ByteBuffer, using SocketChannel to actually send the data.

@Override
public final void flush(a) {
    assertEventLoop();

    // Get the ChannelOutboundBuffer bound to SocketChannel
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    // First mark the unuploading node as flushed
    outboundBuffer.addFlush();
    // Start sending data
    flush0();
}
Copy the code

Flush0 () will start sending data. It first checks if the Channel is active. If it is inactive, flush() will fail and Entry will be removed. If the Channel is healthy, doWrite() is called to send data.

protected void flush0(a) {
    if (inFlush0) {// avoid triggering the last flush0() session before it finishes executing
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {// Non-null check
        return;
    }

    inFlush0 = true;

    // If the connection has been deactivated.
    if(! isActive()) {try {
            if(! outboundBuffer.isEmpty()) {if (isOpen()) {
                    /* The channel is open and may be activated later. Release MSG 2. Trigger failure notification 3. Reclaim Entry 4
                    outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                } else {
                    / * are closed, and similar to the above processing, just don't have to by triggering channelWritabilityChanged () callback. * /
                    outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false); }}}finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        // If the connection is normal, perform the real write() operation
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        handleWriteError(t);
    } finally {
        inFlush0 = false; }}Copy the code

DoWrite () is the core of data sending and is implemented by subclasses, niosocketChannel.dowrite (). It takes a Java-native SocketChannel, converts the bytebuFs in the queue to ByteBuffers, and then sends the data in a loop. The amount of data that can be sent in a single loop is limited by two conditions:

  1. Limit the number of bytebuffers.
  2. The size limit of the buffer set by the TCP parameter (channeloption.so_sndbuf).

If the ChannelOutboundBuffer has a large backlog of data, it may not be able to send all the data in a single run, so it will loop 16 times by default. Too many loops can block the I/O thread, causing events in other channels to go unprocessed.

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    // The number of write cycles. The default value is 16. There may be a large backlog of messages in the output buffer, and a number of times is limited to avoid blocking the IO thread.
    int writeSpinCount = config().getWriteSpinCount();
    do {
        if (in.isEmpty()) {// There is no more data to write
            // Cancel listening for OP_WRITE events
            clearOpWrite();
            return;
        }

        // The maximum value of the send buffer, set by the TCP parameter channeloption.so_sndbuf.
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();

        /* The Entry to be flushed is converted to a Java native ByteBuffer array. Due to total and total byte limits, it may not be possible to send all data at once. Note that only ByteBuf is handled, not FileRegion. * /
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        // This is nioBuffers. Length, set in the previous method
        int nioBufferCnt = in.nioBufferCount();

        switch (nioBufferCnt) {
            case 0:
                // ByteBuf is done, but there may be FileRegion that needs to be processed.
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // Only one ByteBuf needs to be sent
                ByteBuffer buffer = nioBuffers[0];
                // Number of bytes tried to send
                int attemptedBytes = buffer.remaining();
                // Java native socketChannel. write (ByteBuffer) to send data
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {// TCP buffer is full, subscribes to OP_WRITE event, waits until writable to continue processing
                    incompleteWrite(true);
                    return;
                }
                // Dynamically resize the send buffer
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                // Delete the Entry node that has been sent
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                // The send buffer has multiple bytebuFs waiting to be sent
                // Total number of bytes attempted to be sent
                long attemptedBytes = in.nioBufferSize();
                // Call Java native SocketChannel.write() to send data, returning the number of bytes actually sent
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    // Write bytes are 0, maybe TCP buffer is full, subscribe OP_WRITE event, wait for TCP to write again.
                    incompleteWrite(true);
                    return;
                }
                // Dynamically adjust the send buffer to channeloption.so_sndbuf according to the number of bytes actually written
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                /* Delete sent data according to the number of bytes actually written, not according to the number of bytebufs. Starting with flushedEntry, calculate the size of each ByteBuf and delete each. There may be a case where a ByteBuf sends part of the data and adjusts its readerIndex. * /
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break; }}}while (writeSpinCount > 0);

    /* Only if nioBufferCnt is finished, doWrite0(in) is called to handle FileRegion, and the FileRegion is not finished. If FileRegion is not finished, writeSpinCount will be less than 0, and the OP_WRITE event will continue to be subscribed until the Channel becomes writable. * /
    incompleteWrite(writeSpinCount < 0);
}
Copy the code

. In addition, NioSocketChannel doWrite () will only send ByteBuf FileRegion delivery. You need to call the superclass AbstractNioByteChannel doWrite0 () processing.

/* NioSocketChannel is only responsible for sending ByteBuf, FileRegion. * /
protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
    Object msg = in.current();
    if (msg == null) {
        // Directly return here so incompleteWrite(...) is not called.
        return 0;
    }
    // Data is sent
    return doWriteInternal(in, in.current());
}

private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if(! buf.isReadable()) {// If there is no data to read, delete the node
            in.remove();
            return 0;
        }

        // Write ()
        final int localFlushedAmount = doWriteBytes(buf);
        if (localFlushedAmount > 0) {
            in.progress(localFlushedAmount);
            if(! buf.isReadable()) { in.remove(); }return 1; }}else if (msg instanceof FileRegion) {
        FileRegion region = (FileRegion) msg;
        // Number of bytes that have been transferred >= Total number of bytes. The file has been transferred and the node is deleted.
        if (region.transferred() >= region.count()) {
            in.remove();
            return 0;
        }
        TransferTo (javaChannel(), position) File transfer
        long localFlushedAmount = doWriteFileRegion(region);
        if (localFlushedAmount > 0) {// The number of bytes actually sent
            in.progress(localFlushedAmount);
            if (region.transferred() >= region.count()) {// After FileRegion is sent, remove the node
                in.remove();
            }
            return 1; }}else {
        // Should not reach here.
        throw new Error();
    }
    Return an integer. MAX_VALUE that makes writeSpinCount less than 0, so it will subscribe to the OP_WRITE event and wait for the Channel to become writable. * /
    return WRITE_STATUS_SNDBUF_FULL;
}
Copy the code

Note that there are two possible flush() operations:

  1. Data is successfully sent.
  2. The maximum number of cycles has been exceeded before the data is sent out. In order not to block the IO thread, the data will be processed next time.
  3. The TCP buffer was full and data could not be sent. Procedure

In both cases, this is an “incompleteWrite,” so incompleteWrite(setOpWrite) is called to continue processing later. In the third case, the Netty needs to subscribe to the OP_WRITE event and wait until the Selector notifies the Channel that it is writable to continue sending data. The setOpWrite parameter indicates whether to listen for OP_WRITE events:

/** * incomplete write *@paramSetOpWrite Whether to subscribe to the OP_WRITE event */
protected final void incompleteWrite(boolean setOpWrite) {
    // If setOpWrite is true, the TCP buffer is full and the OP_WRITE event needs to be subscribed until the Channel becomes writable.
    if (setOpWrite) {
        // Subscribe to the OP_WRITE event
        setOpWrite();
    } else {
        // Unsubscribe the OP_WRITE event
        clearOpWrite();

        // Submit a flush task to execute later to avoid blocking the IO thread.eventLoop().execute(flushTask); }}Copy the code

At this point, writeAndFlush() is processed. The ChannelOutboundBuffer is not analyzed in this section.

ChannelOutboundBuffer source code analysis

The ChannelOutboundBuffer is a Netty data send buffer that is created along with a SocketChannel.

First look at attributes:

/* The size of the extra bytes used to wrap ByteBuf into an Entry because the Entry object takes up space in addition to the data of the ByteBuf itself. Why 96? Why are changes supported?? 1.96 is the maximum Netty calculates for a 64-bit JVM. 2. If your program is running on a 32-bit JVM, or if compression is enabled for object references, you can modify this value as required. Analysis of why it takes up to 96 bytes: In 64-bit JVMS, an Entry object takes up the following space: - 16 bytes of object header space - 6 object reference attributes, minimum 4*6=24 bytes, maximum 8*6=48 bytes - 2 long attributes, 2*8=16 bytes - 2 int attributes, 2*4=8 bytes - 1 Boolean attributes, 1 byte - padding alignment, The JVM requires an object to occupy an integer multiple of 8 bytes, in this case 7 bytes, totaling up to 96 bytes. */
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead".96);

/* Write is a very frequent operation. In order to avoid creating arrays frequently, this is used for reuse. Each thread will reuse its own ByteBuffer[]. * /
private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
    @Override
    protected ByteBuffer[] initialValue() throws Exception {
        // The default size is 1024, which can be expanded later if necessary
        return new ByteBuffer[1024]; }};// Bound SocketChannel
private final Channel channel;

// Head node is flush, waiting to be sent.
private Entry flushedEntry;

// The head node that has been written but is not flushed, flush() will use it to find all the way back
private Entry unflushedEntry;

// End of chain node
private Entry tailEntry;

Flush specifies the flushed node that starts with flushedEntry when uploading data.
private int flushed;
// The number of Nio buffers written in a single loop
private int nioBufferCount;
// Total Nio Buffer size for a single loop write
private long nioBufferSize;
// Flush fails
private boolean inFail;

// Calculate the offset of the totalPendingSize property and use CAS to change it.
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
        AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

// The total memory occupied by the messages temporarily stored in the output buffer. This value is used to determine whether the high and low watermark has been reached to modify the writable state of a Channel.
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;

// The offset of the unwritable attribute, which can be modified by CAS.
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

// Channel writable state, 0 writable, 1 writable. Modify when output buffer memory reaches high and low water mark.
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;

// A task that is triggered when the writable state of a Channel changes. The task is triggered when the backlog of messages reaches the high and low watermark
private volatile Runnable fireChannelWritabilityChangedTask;
Copy the code

It is itself a one-way linked list, consisting of a series of Entry nodes. It has three node Pointers:

  • FlushedEntry: Flushes the start node pointer that is waiting to be sent.
  • UnflushedEntry: the start node pointer that is written and waiting to flush.
  • TailEntry: Pointer to the tail of the chain.

Here’s a sketch of how it works:In the last video, executeflush(msg)When you do this, you just temporarily store the data to the ChannelOutboundBufferaddMessage()It does two main things:

  1. Encapsulate MSG into an Entry node and add it to the end of the chain.
  2. Statistics whether the total number of bytes of messages in the output buffer reaches the high watermark. If so, set the Channel to “unwritable” and triggerChannelWritabilityChangedThe callback.
/** * store the message to ChannelOutboundBuffer, and the promise will be notified if the store succeeds. *@paramMSG Data to be sent :ByteBuf/FileRegion *@paramSize Memory size occupied by data *@paramPromise write Success will be notified */
public void addMessage(Object msg, int size, ChannelPromise promise) {
    // Wrap MSG into an Entry and add it to the end of the chain.
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        // tail is not empty, add to its next
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;// tailEntry points to the newly added node
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // Count the number of bytes pending messages. If the high watermark is exceeded, change the writable state of the Channel and trigger a callback
    incrementPendingOutboundBytes(entry.pendingSize, false);
}
Copy the code

Look at entry.newinstance (), which encapsulates MSG as an Entry node and adds it to the end of the chain. The pendingSize attribute of Entry is used to record the memory space occupied by the message. It should be noted that in addition to the data space of the MSG itself, it also adds the space occupied by the Entry object. The space occupied by a Java object is determined at compile time. The reader also needs to understand the memory layout of Java objects.

/** * Create an Entry node that takes an * from the object pool@paramMSG message itself *@paramSize The MessageSizeEstimator estimates the memory size occupied by messages *@paramTotal The size of the message itself (the difference is how FileRegion is handled) *@paramPromise write() will be notified when completed *@return* /
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
    /* Each write() requires an Entry. Since write() is a very frequent operation, object pools are reused to avoid frequent creation and destruction of entries. * /
    Entry entry = RECYCLER.get();
    entry.msg = msg;
    CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD (" overhead ") In addition to the space taken up by ByteBuf, Entry itself takes up space. In 64-bit JVMS: - 16 bytes of object header space - 6 object reference attributes, minimum 4*6=24 bytes, maximum 8*6=48 bytes - 2 long attributes, 2*8=16 bytes - 2 int attributes, 2*4=8 bytes - 1 Boolean attributes, 1 byte - padding alignment, The JVM requires that the memory occupied by the object be an integer multiple of 8 bytes, in this case 7 bytes total up to 96 bytes, so the default value for CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD is even 96 bytes. * /
    entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
    entry.total = total;
    entry.promise = promise;
    return entry;
}
Copy the code

After joining chain table Entry, incrementPendingOutboundBytes () accumulative total number of bytes, determine whether more than high water level:

/** * Count the number of bytes pending messages. If the high watermark is exceeded, the writable state of the Channel needs to be changed and a callback is triggered@paramSize Number of bytes of memory occupied by the message *@paramWhether invokeLater triggers a callback later */
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // Count the size of the memory temporarily stored by messages
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // Message backlog reached the high watermark
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        // Change the writable state to trigger a callbacksetUnwritable(invokeLater); }}Copy the code

SetUnwritable () fires when the total number of bytes of data exceeds the high watermark. It changes unwritable from 0 to 1 by spinning +CAS, and then fires a callback:

// Set Channel to unwritable, CAS executes
private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0) {
                // The CAS operation succeeded, triggering the callback
                fireChannelWritabilityChanged(invokeLater);
            }
            break; }}}Copy the code

So much for the ChannelOutboundBuffer for the write operation, see flush.

When flushed, outboundBuffer.addFlush() is called to mark the unflushed node as flushed. That is, to remove the flushedEntry and unflushedEntry Pointers, this process checks whether the Entry node is flushed. If this Entry is deleted, the node will be skipped and the memory space occupied by the Entry will be reduced.

// Just mark the uploading node as uploading uploading without actually sending data, so that deleted nodes can be skipped.
public void addFlush(a) {
    Entry entry = unflushedEntry;
    if(entry ! =null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            // Set the promise of entry to an uncancelable state
            if(! entry.promise.setUncancellable()) {// If the setting fails, the promise has been canceled, the message needs to be released, and the number of pending bytes needs to be decremented
                int pending = entry.cancel();
                // Decrements the total number of message bytes in the buffer, and if the low water mark is reached, resets the Channel to writable and triggers a callback
                decrementPendingOutboundBytes(pending, false.true);
            }
            entry = entry.next;
        } while(entry ! =null);// Keep looking for nodes to flush

        // All nodes are flushed to empty
        unflushedEntry = null; }}Copy the code

After the node status marker is complete, doWrite() is called to start writing data. First, it needs ChannelOutboundBuffer to convert the flushed node to a Native Java ByteBuffer, using nioBuffers(). Because the OS has a limit on the number of bytes socketchannel.write () can send at a time, typically integer.max_value, two arguments are required for a single conversion:

  • MaxCount: The maximum number of bytebuffers to convert. The default is 1024.
  • MaxBytes: indicates the maximum number of bytes. The default value is the size of the TCP send buffer (channeloption.so_sndbuf).
/** * The Entry to be flushed is converted to a Java native ByteBuffer array. Due to total and total byte limits, it may not be possible to send all data at once. *@paramMaxCount Maximum number of Bytebuffers sent at a time *@paramMaxBytes Maximum value of the send buffer. Set by the TCP parameter channeloption. SO_SNDBUF. *@return* /
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
    assert maxCount > 0;
    assert maxBytes > 0;
    long nioBufferSize = 0;
    int nioBufferCount = 0;
    ByteBuffer[] is created and destroyed too often because write operations are very frequent. It is reused and each thread has a ByteBuffer[1024].
    final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);

    Entry entry = flushedEntry;
    // Ensure that Entry is flushed and MSG is typed as ByteBuf
    while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
        // Make sure the node is not cancelled, and if it is, skip it.
        if(! entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg;final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;

            // The number of bytes readable is the number of bytes to write, make sure it is greater than 0
            if (readableBytes > 0) {
                if(maxBytes - readableBytes < nioBufferSize && nioBufferCount ! =0) {
                    // Send more data than maxBytes, exit the loop
                    break;
                }
                nioBufferSize += readableBytes;
                int count = entry.count;
                if (count == -1) {
                    // -1 indicates that the nioBufferCount of ByteBuf is not set, the number of bytebuffers in ByteBuf
                    entry.count = count = buf.nioBufferCount();
                }
                // Do you need more space
                int neededSpace = min(maxCount, nioBufferCount + count);
                // If the number of bytebuffers exceeds the default value of 1024, request more space
                if (neededSpace > nioBuffers.length) {
                    // Multiply until the array is long enough. And plug back into FastThreadLocal.
                    nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                    NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                }
                if (count == 1) {
                    ByteBuffer nioBuf = entry.buf;
                    if (nioBuf == null) {
                        // Convert ByteBuf to ByteBuffer and cache it into Entry
                        entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                    }
                    / / set the ByteBuffer
                    nioBuffers[nioBufferCount++] = nioBuf;
                } else {
                    // A ByteBuf contains multiple ByteBuffers, iterating through the Settings
                    nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
                }
                if (nioBufferCount >= maxCount) {
                    // The number of bytebuffers exceeds maxCount and exits the loop
                    break;
                }
            }
        }
        entry = entry.next;
    }
    this.nioBufferCount = nioBufferCount;
    this.nioBufferSize = nioBufferSize;

    return nioBuffers;
}
Copy the code

Focus here on the NIO_BUFFERS property, which is a FastThreadLocal. Each thread has its own ByteBuffer[] cache, with a default length of 1024, that can be reused. Why reuse it here? Because as a network IO framework, flush must be a very frequent operation, reuse can improve system performance and reduce GC stress in order to avoid creating ByteBuffer[] every time.

If a ByteBuf consists of multiple bytebuffers, the default 1024 bytebuffers may not be sufficient and expandNioBufferArray() will be called to expand the buffer size:

// Expand the array capacity
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
    int newCapacity = array.length;
    do {
        // Multiply
        newCapacity <<= 1;

        if (newCapacity < 0) {/ / int overflow
            throw newIllegalStateException(); }}while (neededSpace > newCapacity);

    ByteBuffer[] newArray = new ByteBuffer[newCapacity];
    // Element migration
    System.arraycopy(array, 0, newArray, 0, size);

    return newArray;
}
Copy the code

After converting the outgoing ByteBuf to ByteBuffer, NioSocketChannel calls the JDK’s underlying socketChannel.write () to actually send the data.

After data is sent, remove the node from the ChannelOutboundBuffer. Nodes are added at the end of the chain and removed at the head. ChannelOutboundBuffer removes a node based on the number of bytes actually sent. Therefore, a ByteBuf may send only part of its data. If a ByteBuf does not send all its data, the node will not be removed, but its readerIndex will be adjusted. Continue sending the remaining data next time.

/** * Removes ByteBuf based on the number of bytes written to the TCP buffer. *@param writtenBytes
 */
public void removeBytes(long writtenBytes) {
    for (;;) {
        // count from flushedEntry
        Object msg = current();
        if(! (msginstanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;
        // If the data of a single ByteBuf is <= writtenBytes, the Entry node is removed directly
        if (readableBytes <= writtenBytes) {
            if(writtenBytes ! =0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            // If a ByteBuf is sending part of the data, adjust its readerIndex and continue sending next time.
            if(writtenBytes ! =0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break; }}/ / reset NIO_BUFFERS
    clearNioBuffers();
}
Copy the code

At this point, Netty data to send core process all analysis ended.

conclusion

To avoid writing data to the TCP buffer every time you write, Netty’s Channel provides two operations: write and Flush, which rely on a core class, ChannelOutboundBuffer. Write simply stores data to a buffer. Flush sends data. In addition, Netty provides a high and low watermark to avoid OOM caused by too many messages. When a temporary message reaches a high watermark, Netty sets the Channel to Unwritable and triggers a callback. Users can determine whether to continue writing messages based on the status.

ChannelOutboundBuffer itself is a one-way linked list that manages temporary messages and converts ByteBuf to ByteBuffer when data needs to be sent, since the underlying SocketChannel of the JDK only supports writing to ByteBuffers.

Once the data is sent, the ChannelOutboundBuffer is responsible for removing the Entry node based on the number of bytes actually sent. In this case, the ChannelOutboundBuffer does not remove the Entry node. Instead, adjust its readerIndex index and continue sending the rest of the data next time.