Wangwei. One /posts/netty…

In the previous chapters, we analyzed the three components of Netty — Channel, EventLoop and Pipeline, and had a deep understanding of the working principle of Netty. On this basis, we analyze how Netty handles new connections after the Netty server is started.

This paper is mainly divided into the following four parts:

  • New connection detection
  • NioSocketChannel create
  • NioSocketChannel initialization and registration
  • NioSocketChannel registers the READ interest set

New connection detection

EventLoop source code: EventLoop source code: EventLoop

public final class NioEventLoop extends SingleThreadEventLoop {...private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {...try{...if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
                // Read the read eventunsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }... }... }Copy the code

Taking the server NioServerSocketChannel as an example, its bound unsafe instance is NioMessageUnsafe. Unsafe.read(), the niomessageunsafe.read () interface is also called, as follows:

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {
		
        // Used to store a collection of newly created NIo SocketChannels
        private final List<Object> readBuf = new ArrayList<Object>();
		
        @Override
        public void read(a) {
            // Make sure the thread is consistent with EventLoop
            assert eventLoop(a).inEventLoop(a);
            // get the NioServerSocketChannel config configuration
            final ChannelConfig config = config();
            // get the NioServerSocketChannel binding pipeline
            final ChannelPipeline pipeline = pipeline();
            // Get RecvByteBuf allocator Handle
            // When a channel receives data, allocHandle is used to allocate ByteBuf to hold the data
            // allocHandle will be introduced in detail later
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            // Resets all counters that have been accumulated and provides suggestions for how many messages/bytes of data to read in the next read cycle
            allocHandle.reset(config);
			
            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        // Call the doReadMessages interface and return 1 if message is read
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
						// Count the number of messages read by the current read loop +1
                        allocHandle.incMessagesRead(localRead);
                        // Determine whether to continue reading message
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
                
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // Call pipeline to propagate the ChannelRead event
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                / / empty readBuf
                readBuf.clear();
                allocHandle.readComplete();
                // Call pipeline to propagate ChannelReadComplete
                pipeline.fireChannelReadComplete();

                if(exception ! =null) {
                    closed = closeOnReadError(exception);
                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if(isOpen()) { close(voidPromise()); }}}finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if(! readPending && ! config.isAutoRead()) { removeReadOp(); }}}}... }Copy the code

For doReadMessages (…). Analysis:

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {...// Read the message
	@Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        / / get a SocketChannel
        SocketChannel ch = SocketUtils.accept(javaChannel());
		
        try {
            if(ch ! =null) {
                // Create NioSocketChannel with SocketChannel and store it in buF list
                // See the following analysis for the creation of NioSocketChannel
                buf.add(new NioSocketChannel(this, ch));
                return 1; }}catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2); }}return 0; }... }Copy the code

ContinueReading () interface analysis, as to why the result returns false, will be analyzed separately later:

public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
	
    private volatile int maxMessagesPerRead;
    private volatile boolean respectMaybeMoreData = true; .public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        // The maximum number of messages read at a time
        private int maxMessagePerRead;
        private int totalMessages;
        private int totalBytesRead;
        private int attemptedBytesRead;
        private int lastBytesRead;
        private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get(a) {
                returnattemptedBytesRead == lastBytesRead; }}; .// Determine whether to continue reading message
        @Override
        public boolean continueReading(a) {
            return continueReading(defaultMaybeMoreSupplier);
        }
        
        // Determine whether to continue reading message
        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            By default config.isautoread () is true
            // respectMaybeMoreData defaults to true
            / / maybeMoreDataSupplier. The get () to false
            // totalMessages the first loop is 1
            / / maxMessagePerRead is 16
			// Returns false
            returnconfig.isAutoRead() && (! respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead >0; }... }... }Copy the code

NioSocketChannel create

NioSocketChannel creation process is similar to that of NioServerSocketChannel creation.

The constructor

Let’s take a look at the NioSocketChannel constructor:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...public NioSocketChannel(Channel parent, SocketChannel socket) {
        // Call the parent constructor
        super(parent, socket);
        / / create NioSocketChannelConfig
        config = new NioSocketChannelConfig(this, socket.socket()); }... }Copy the code

AbstractNioByteChannel constructor:

public abstract class AbstractNioByteChannel extends AbstractNioChannel {...protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        // Call the superclass constructor and set the interest set to selectionKey. OP_READ, interested in the read event
        super(parent, ch, SelectionKey.OP_READ); }... }Copy the code

AbstractNioChannel constructor:

public abstract class AbstractNioChannel extends AbstractChannel {...protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // Call the parent constructor
        super(parent);
        / / set the channel
        this.ch = ch;
        // Set the interest set
        this.readInterestOp = readInterestOp;
        try {
            // Set it to non-blocking
            ch.configureBlocking(false);
        } catch(IOException e) { ... }}}Copy the code

AbstractChannel constructor:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {...protected AbstractChannel(Channel parent) {
        / / set the parent
        this.parent = parent;
        / / create channelId
        id = newId();
        / / create the unsafe
        unsafe = newUnsafe();
        / / create a pipelinepipeline = newChannelPipeline(); }... }Copy the code

ChannelConfig create

Then let’s look at the NioSocketChannelConfig creation logic:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
        // Call the parent constructor
        super(channel, javaSocket); calculateMaxBytesPerGatheringWrite(); }... }Copy the code

The parent class DefaultSocketChannelConfig constructor:

public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {...public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
       // Call the parent constructor to bind socketChannel
       super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        // Bind the Java socket
        this.javaSocket = javaSocket;
		
        // Enable TCP_NODELAY by default if possible.
        / / netty generally run on the server, not on the Android, canEnableTcpNoDelayByDefault returns true
        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
            try {
            	// Enable TCP_NODELAY to enable TCP nagle algorithm
                // Try not to wait, as long as there is data in the send buffer, and the send window is open, try to send data to the network.
                setTcpNoDelay(true);
            } catch (Exception e) {
                // Ignore.}}}... }Copy the code

NioSocketChannel initialization and registration

The above section analyzes the creation logic of NioSocketChannel. After creating NioSocketChannel, we will analyze how NioSocketChannel is registered to NioEventLoop.

The following snippet of code analyzes new connection detection in the previous section:

private final class NioMessageUnsafe extends AbstractNioUnsafe {...int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        // Call pipeline to propagate the ChannelRead eventpipeline.fireChannelRead(readBuf.get(i)); }... }Copy the code

Call pipeline to propagate the ChannelRead event. Pipeline is a server Channel tied to NioServerSocketChannel.

So where does this ServerBootstrapAcceptor come from?

Earlier, we analyzed NioServerSocketChannel initialization with the following code:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap.ServerChannel> {...// NioServerSocketChannel initialization
    void init(Channel channel) throws Exception {
        // Gets the option parameter configured when the initiator starts, mainly some TCP properties
        finalMap<ChannelOption<? >, Object> options = options0();// Get the options configuration to ChannelConfig
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        // Get the attr parameters configured when ServerBootstrap is started
        finalMap<AttributeKey<? >, Object> attrs = attrs0();// Configure the Channel attr, mainly to set some user-defined parameters
        synchronized (attrs) {
            for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); }}// Get the pipeline in the channel that we set up earlier during channel creation
        ChannelPipeline p = channel.pipeline();

        // Save the childGroup configured in the initiator to the local variable currentChildGroup
        final EventLoopGroup currentChildGroup = childGroup;
        // Save the childHandler configured in the initiator to the local variable currentChildHandler
        final ChannelHandler currentChildHandler = childHandler;
        finalEntry<ChannelOption<? >, Object>[] currentChildOptions;finalEntry<AttributeKey<? >, Object>[] currentChildAttrs;// Save user-set childOptions to local variable currentChildOptions
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        // Save user-set childAttrs to local variable currentChildAttrs
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                // Get the handler configured on the initiator
                ChannelHandler handler = config.handler();
                if(handler ! =null) {
                    // Add handler to pipeline
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run(a) {
                        // Create a new connection access ServerBootstrapAcceptor with child-related arguments
                        // Bind a new connection to a thread with ServerBootstrapAcceptor
                        Each time a new connection comes in, ServerBootstrapAcceptor configures it with child-related attributes and registers it with the ChaildGroup
                        pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }... }Copy the code

ServerBootstrapAcceptor

When NioServerSocketChannel is initialized, an InboundHandler node — ServerBootstrapAcceptor — is added to the Pipeline bound to NioServerSocketChannel with the following code:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap.ServerChannel> {...private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
		
        // SubeventLoopGroup, i.e. workGroup
        private final EventLoopGroup childGroup;
        ServerBootstrap childHandler configured at startup
        private final ChannelHandler childHandler;
        ServerBootstrap childOptions configured at startup
        private finalEntry<ChannelOption<? >, Object>[] childOptions;// ServerBootstrap childAttrs configured at startup
        private finalEntry<AttributeKey<? >, Object>[] childAttrs;private final Runnable enableAutoReadTask;
		
        // constructor
        ServerBootstrapAcceptor(
                finalChannel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<? >, Object>[] childOptions, Entry<AttributeKey<? >, Object>[] childAttrs) {this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;

            // Task which is scheduled to re-enable auto-read.
            // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
            // not be able to load the class because of the file limit it already reached.
            //
            // See https://github.com/netty/netty/issues/1328
            enableAutoReadTask = new Runnable() {
                @Override
                public void run(a) {
                    channel.config().setAutoRead(true); }}; }// Handle channelRead events propagated by Pipeline
        // This is the same code we saw earlier when we detected the new connection
        // pipeline.fireChannelRead(readBuf.get(i));
        ServerBootstrapAcceptor's channelRead interface will be called to handle channelRead events
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // Get the object data of the propagated event, which is readbuf.get (I) above
            // readbuf.get (I) fetches NioSocketChannel
            final Channel child = (Channel) msg;
			// Add childHandler to NioSocketChannel, as we often see
            // ServerBootstrap Configures the code at startup:
            // ServerBootstrap.childHandler(new ChannelInitializer
      
       () {... })
      
            // the end result is to add a user-defined ChannelHandler to the NioSocketChannel Pipeline
            // Used to handle client channel connections
            child.pipeline().addLast(childHandler);
			// Configure TCP properties for NioSocketChannel
            setChannelOptions(child, childOptions, logger);
			// Configure NioSocketChannel with some user-defined data
            for(Entry<AttributeKey<? >, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }// register NioSocketChannel with childGroup
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}... }... }Copy the code

For ChannelInitializer, see the previous Pipeline source code analysis article.

The following register logic is similar to that of NioServerSocketChannel registration. Here is a brief introduction.

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {.../ / register NioSocketChannel
    / / eventLoop childGroup
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {...// Bind eventLoop to NioSocketChannel
        AbstractChannel.this.eventLoop = eventLoop;
		The eventLoop, unlike the main thread, returns false
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run(a) {
                        // Call the register0 method hereregister0(promise); }}); }catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}}/ / register
    private void register0(ChannelPromise promise) {
        try{...boolean firstRegistration = neverRegistered;
            / / call doRegister ()
            doRegister();
            neverRegistered = false;
            registered = true;
            
            pipeline.invokeHandlerAddedIfNeeded();

            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            
            // The server NioServerSocketChannel has established a connection with the client NioSocketChannel
            // so NioSocketChannel isActive, isActive() returns true
            if (isActive()) {
                // For a new connection, it is the first registration
                if (firstRegistration) {
                    // Propagate the ChannelActive event
                    pipeline.fireChannelActive();
                } else if(config().isAutoRead()) { beginRead(); }}... }catch (Throwable t) {
            // Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}... }Copy the code

Call the doRegister() method in NioSocketChannel:

public abstract class AbstractNioChannel extends AbstractChannel {...@Override
    protected void doRegister(a) throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // Register the selector with the underlying JDK channel and attach the NioSocketChannel object
                // The interest set is set to 0, indicating that no events are concerned
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
                return;
            } catch(CancelledKeyException e) { ... }}}... }Copy the code

NioSocketChannel registers the OP_READ interest set

Following the analysis above, the logic after propagating the ChannelActive event is basically to register a Read interest set with the client’s NioSocketChannel

if (isActive()) {
    // For a new connection, it is the first registration
    if (firstRegistration) {
        // Propagate the ChannelActive event
        pipeline.fireChannelActive();
    } else if(config().isAutoRead()) { beginRead(); }}Copy the code

Through Pipeline propagation, the doBeginRead() interface is eventually called as follows:

public abstract class AbstractNioChannel extends AbstractChannel {...protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {...@Override
        protected void doBeginRead(a) throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            // Save selectionKey to local variables
            final SelectionKey selectionKey = this.selectionKey;
            // Determine the validity
            if(! selectionKey.isValid()) {return;
            }
		
            readPending = true;
			
            // Get the selectionKey interest set
            // Set selectionKey's interest set to 0
            final int interestOps = selectionKey.interestOps();
            // readInterestOp is the value set when NioSocketChannel was created
            // selectionkey. OP_READ, which is 1
            if ((interestOps & readInterestOp) == 0) {
                // selectionKey is finally set to selectionkey.op_read
                // indicates an interest in reading eventsselectionKey.interestOps(interestOps | readInterestOp); }}... }... }Copy the code

summary

  • Where does Netty detect new connections?
  • How are new connections registered to the NioEventLoop thread?

The resources

  • Java read source Netty in-depth analysis