takeaway

Original article, reprint please indicate the source.

This article source address: netty-source-code-Analysis

The version of Netty used in this article is version 4.1.6.final: annotated netty source code

In our “BIO vs NIO” article, we show the Hello World server written using JDK native NIO. Remember the key steps? Let’s go over them again.

  1. Create a ServerSocketChannel

  2. Set ServerSocketChannel to non-blocking

  3. Bind ServerSocketChannel to port 8000

  4. Register a ServerSocketChannel with a Selector

Today we’re going to take a look at these key steps and see what netty does in between.

1 Server boot code

The following code bootlaces a server. We refer to this program as “bootstrap code” in this article. This code is simple. Create two EventloopGroups: bossGroup and workerGroup. Create ServerBoostrap, pass in bossGroup and workerGroup, and configure a handler to listen for the port connection. Then we set up a childHandler that will be used by the new connection. We are not going to talk about new connection access in this article, so nothing is done in childHandler here.

Running this code will give you the following result in the console.

HandlerAdded

ChannelRegistered

ChannelActive

/** * Welcome to follow the public account "kind code", get in-depth communication on wechat of bloggers **@author wangjianxin
 */
public class com.zhongdaima.netty.analysis.bootstrap.ServerBoot {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .attr(AttributeKey.valueOf("ChannelName"), "ServerChannel")
                    .handler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("ChannelRegistered");
                        }

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("ChannelActive");
                        }

                        @Override
                        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("HandlerAdded");
                        }
                    }).childHandler(new ChannelInboundHandlerAdapter(){

            });
            ChannelFuture f = b.bind(8000).sync();
            f.channel().closeFuture().sync();
        } finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

2 Startup Process

From the bind method of ChannelFuture f = b.bind(8000).sync() to the doBind method of AbstractBootStrap, the process is as simple as wrapping the port number as a SocketAddress.

The key code inside doBind is the initAddRegister method on the first line, followed by the doBind0 method.

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        if (regFuture.isDone()) {
             doBind0(regFuture, channel, localAddress, promise);
        } else {
            
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if(cause ! =null) {}else{ doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code

Into the initAndRegister method, initAddResgiter there are three key steps in the method, 1 is channelFactory newChannel (), 2 is the init (channel), 3 is the config (.) group (). The register (channel).

final ChannelFuture initAndRegister(a) {
        Channel channel = null;
        try {
            // Create a Channel
            channel = channelFactory.newChannel();
            // Initialize the Channel
            init(channel);
        } catch (Throwable t) {
           
        }
        / / registered ChannelChannelFuture regFuture = config().group().register(channel); . }Copy the code

The entire doBind method is divided into four key steps, which are:

  1. channelFacotry.newChannel()
  2. init(channel)
  3. config().group().register(channel)
  4. doBind0

Let’s look at each of these four key steps.

2.1 Creating a Channel

ChannelFacotry is a property of AbstractBootStrap. Where is this property assigned? When we call b. hannel(NioServerSocketChannel) at startup, This method, in AbstractBootStrap, is so simple that we won’t analyze it any more. The result is that channelFactory is assigned to ReflectiveChannelFactory, which means to create a Channel using ReflectiveChannelFactory. Let’s look at newChannel(). It’s very simple. Clazz. newInstance calls the no-parameter constructor to create the instance.

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Class<? extends T> clazz;

    @Override
    public T newChannel(a) {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class "+ clazz, t); }}}Copy the code

Next, let’s take a look at NioServerSocketChannel’s parameterless constructor, which calls the newSocket method to create a JDK ServerSocketChannel. Ok, so we’ve seen the first step “Create a ServerSocketChannel” mentioned in the guide, passing this channel to the constructor of the parent class, along with a parameter called selectionkey.op_accept. Remember this parameter will be mentioned later.

public NioServerSocketChannel(a) {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {

    }
}

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code

Let’s go with to the parent class AbstractNioMessageChannel constructor, no other operations, to continue to call the superclass constructor.

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
Copy the code

Then we go to the AbstractNioChannel constructor, where we see ch.configureblocking (false), and we see step 2 “set Channel to non-blocking” mentioned in the introduction. AbstractNioChannel calls the constructor of the superclass again. Let’s read on.

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            // Set Channel to non-blocking
            ch.configureBlocking(false);
        } catch (IOException e) {
            throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code

We go to the Constructor of AbstractChannel, which creates an ID, an Unsafe, and a PipeLine for the Channel. We’ll talk about Unsafe and PipeLine later.

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
Copy the code

2.2 Initializing a Channel

Let’s go back to AbstractBootstrap’s initAndRegister method and move on to init(channel), which is an abstract method implemented in ServerBootstrap.

The main logic of the init method is to set the Channel parameters, properties, and add the Handler we configured in the bootstrap code. Finally, we add a ServerBootStrapAccptor, which, as the name implies, is a Handler that handles new connections.

The ServerBootStrapAccptor we’ll talk about in a later chapter, but we’ll skip it here. As for why ch.eventloop ().execute is called instead of adding it directly, I have a brief hint in the code. In fact, in the current version, adding it directly is not a problem. I’ll show you that in the video tutorial, so stay tuned.

void init(Channel channel) throws Exception {
        // Set the Channel parameter, which we set in the boot code with.option(channeloption.tcp_nodelay, true)
        finalMap<ChannelOption<? >, Object> options = options0();synchronized (options) {
            channel.config().setOptions(options);
        }
        Attr (attributeKey.valueof ("ChannelName"), "ServerChannel")) in the bootstrap code
        finalMap<AttributeKey<? >, Object> attrs = attrs0();synchronized (attrs) {
            for(Entry<AttributeKey<? >, Object> e : attrs.entrySet()) {@SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        // The anonymous ChannelInitializer is added immediately to the PipeLine whether or not the EventLoop thread is executing
        // But the anonymous initChannel method of the ChannelInitializer is called by the channelAdded method, which can only be called by the EventLoop thread
        // The Channel is not yet bound to the EventLoop thread, so calls to the anonymous ChannelInitializer's channelAdded method will be wrapped as asynchronous tasks and added to the PipeLine's pendingHandlerCallback list
        // When a Channel is bound with an EventLoop, the task is fetched from the pendingHandlerCallback list and executed.
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if(handler ! =null) {
                    // Add the handler we configured in the bootstrap code
                    pipeline.addLast(handler);
                }

                // If you have a question about this, why not direct pipeline.addLast
                / / issue link https://github.com/netty/netty/issues/5566
                // The handlerAdded method of ChannelInitializer is the key to the current version of Pipeline. AdLast
                // You can compare the 4.0.39.Final version and 4.1.6.Final version
                / / add ServerBootStrapAcceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run(a) {
                        pipeline.addLast(newServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code

2.3 bindingEventLoopAnd to theSelectorRegistration of the Channel

Let’s go back to AbstractBootstrap’s initAndRegister method and see ChannelFuture regFuture = config().group().register(channel); This is the place to register Channel. Let’s go and have a look.

The return of config.group() is the bossGroup that we set up in the bootstrap code, and since there is only one Channel, only one EventLoop is needed in bossEventLoopGroup.

Look at with to register (channel), the register method is abstract, concrete implementation in MultithreadEventLoopGroup, and go in.

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
Copy the code

The next() method calls the Next () method of EventExecutorChooser to select an EventLoop. EventExecutorChooser has two implementations, one is PowerOfTowEventExecutorChooser and GenericEventExecutorChooser, both with the Chooser is polling strategy, just different polling algorithm. If within EventLoopGroup EventLoop number is a power of 2, use PowerOfTowEventExecutorChooser, otherwise use GenericEventExecutorChooser.

PowerOfTowEventExecutorChooser use bit operation.

@Override
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}
Copy the code

And GenericEventExecutorChooser use take over operation.

@Override
public EventExecutor next(a) {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
Copy the code

As we can see from the selection algorithm of EventLoop, Netty does everything for performance.

Chooser attribute assignment within the constructor of MultithreadEventExecutorGroup through chooserFactory created.

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    chooser = chooserFactory.newChooser(children);
}
Copy the code

And chooserFactory assignment in MultithreadEventExecutorGroup another constructor. When we are in the boot code with new NioEventLoopGroup (1) create EventLoopGroup will call to the constructor, the default value is DefaultEventExecutorChooserFactory. The INSTANCE.

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code

The EventLoop selected by the next() method is SingleThreadEventLoop, so we follow the register method for SingleThreadEventLoop, and finally call the register method for unsafe.

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
Copy the code

Unsafe. The register method in io.net ty. Channel. AbstractChannel. AbstractUnsafe, we to go and see. The two main things to do in a register method are to bind an EventLoop and to call the register0 method. The calling thread is not an EventLoop thread and initiates an asynchronous task.

@Override public final void register(EventLoop eventLoop, Final ChannelPromise promise) {/ / bind eventloop AbstractChannel. Enclosing eventloop = eventloop; if (eventLoop.inEventLoop()) { register0(promise); // We are not in the EventLoop, that is, the current thread is not an EventLoop thread, } else {try {eventloop.execute (new Runnable() {@override public void run() {// call subclass register0 register0(promise); }}); } catch (Throwable t) { } } }Copy the code

There are three main steps within the register0 method.

The first step is doRegister(), which we’ll talk about later.

Step 2 is pipeline invokeHandlerAddedIfNeeded () this step is to complete the trigger to add handler before binding EventLoop operation, for example, we added a ChannelInitializer, The Handler added to the initChannel method of the ChannelInitalizer, which is called by the channelAdded method, which must be called inside an EventLoop, The call is encapsulated as an asynchronous task before an EventLoop is bound.

These operations are in the pendingHandlerCallbackHead in pipeline, is a two-way linked list, For details, see addLast(EventExecutorGroup Group, String Name, ChannelHandler Handler) of DefaultChannelPipeLine.

This step calls system.out.println (“HandlerAdded”) in our bootstrap and types “HandlerAdded” on the console.

Step 3 The ChannelRegistered event is triggered. This step calls system.out.println (“ChannelRegistered”) in our bootloader, typing “ChannelRegistered” on the console.

Ok, so now we know why our bootstrap is typing “HandlerAdded” and “ChannelRegistered” first.

Then we go down to isActive() and finally we call the isBound method of the JDK ServerSocket class. We’re not going to post the code, the reader is going to check it out, it’s very simple, obviously we haven’t done port binding yet, so the if branch code doesn’t execute.

    private void register0(ChannelPromise promise) {
        try {
            // Register the Channel with the Selector
            doRegister();
           
            / / to do those in the binding EventLoop trigger to add handler before the operation, the operation is in the pendingHandlerCallbackHead in pipeline, a linked list, For details, see 'addLast(EventExecutorGroup Group, String Name, ChannelHandler Handler)' of DefaultChannelPipeLine.
            pipeline.invokeHandlerAddedIfNeeded();
            
            // Set the promise to be successful
            safeSetSuccess(promise);

            // The ChannelRegistered event is triggered
            pipeline.fireChannelRegistered();
            
            // There is no Active, because the port binding has not been completed, so the if branch code will not execute
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if(config().isAutoRead()) { beginRead(); }}}catch (Throwable t) {
        }
    }
Copy the code

Let’s follow the doRegister method, which is an abstract method, and in this case the method is implemented in AbstractNioChannel. Ok, so this is where we finally get to step 4: registering a Channel with a Selector.

    @Override
    protected void doRegister(a) throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0.this);
                return;
            } catch (CancelledKeyException e) {
                
            }
        }
    }
Copy the code

So at this point, out of the four steps that we talked about in the introduction, there’s a third step that we don’t see, so where is it? Let’s move on.

2.4 Binding Port Number

Earlier we said that there are two important calls to the doBind method, initAndRegister and doBind0. InitAndRegister we have already analyzed, so let’s look at doBind0. Because register in initAndRegister is an asynchronous operation, so when initAndRegister returns, register may or may not be complete, so that’s a judgment, if it’s complete then doBind0 is called, if it’s not complete, DoBind0 will be put into the regFuture Listener, and after the register operation is complete, the EventLoop thread will call back and forth.

The Listener is called when a promise’s setSuccess or setFailure is called. Remember the abstractunsafe.register0 method above, one of which calls safeSetSuccess(Promise).

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if(cause ! =null) {}else{ doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code

Between the time the if is completed and the time the Listener is added, the promise may have been completed, and the Listener may not have been called back. The mystery in DefaultPromise’s addListener(GenericFutureListener
> Listener), if the promise is completed, then the nofityListeners method is called directly to submit the asynchronous task to EventLoop (at this point, the EventLoop binding is complete). The asynchronous task is the Listener that the callback has just registered.

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}
Copy the code

Let’s go back to the doBind0 method, which calls the channel-bind method, implemented in AbstractChannel.

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run(a) {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code

The bind method in AbstractChannel calls Pipeline. bind. Remember the diagram in the “Overall Architecture of Netty” article, let’s show it again.

The bind method calls the Tail bind method first and then propagates to the Head bind method. We’ll see how this is done when we talk about PipeLine.

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

Copy the code

So here we go straight to the bind method of the HeadContext, and we see that it’s calling the bind method of the unsafe, and we saw earlier that the Channel is finally calling the unsafe method when it registers with the Selector. Now, just so you know, unsafe is the most direct class in Netty that deals with a Channel, and everything that does to a Channel is going to end up on unsafe, and we’ll talk about that a little bit later.

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}
Copy the code

AbstractUnsafe: bind to AbstractUnsafe: Bind to AbstractUnsafe: Bind to AbstractUnsafe: Bind to AbstractUnsafe The second step is to trigger the ChannelActive event. This step has an isActive judgment. At this point we have completed the port binding, so it is true. This step calls system.out.println (“ChannelActive”) in our bootstrap to print out “ChannelActive” on the console.

@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { } if (! wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); }}); }}Copy the code

The doBind method is implemented in NioServerSocketChannel. Let’s take a look at the step 3 operation “bind port” mentioned in this guide. We have already seen that the server is started.

protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code
2.4.1 Registering an Interest Event

But there seems to be something missing, because when we were writing the JDK API, when we registered the selector, we passed the event of interest, why don’t we see the registration of the event of interest here. We continue to return to the AbstractUnsafe bind method and finally call the pipeline. The fireChannelActive (), below is the pipeline fireChannleActive method, Call the AbstractChannelHandlerContext. InvokeChannelActive (head), and the head is our “HeadContext of netty overall architecture diagram”.

@Override public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; } static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); }}); }}Copy the code

The channelActive method in HeadContext is as follows, and the secret is in readIfIsAutoRead, and readIfIsAutoRead, finally calls channel.read.

@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); }}Copy the code

The implementation of the channel.read method in AbstractChannel calls Pipeline.read.

@Override
public Channel read(a) {
    pipeline.read();
    return this;
}
Copy the code

The read method in a PipeLine is as follows. The tail read method is called, and eventually this call is propagated to the head read method. We’ll see how this is propagated when we talk about pipelines. Let’s go straight to the read method of the HeadContext.

@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}
Copy the code

The read method of HeadContext is called to unsafe.beginRead().

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
Copy the code

The beginRead method is implemented in AbstractUnsafe where doBeginRead is called. The doBeginRead method is implemented in AbstractNioChannel.

@Override
public final void beginRead() {
    try {
        doBeginRead();
    } catch (final Exception e) {
    
    }
}
Copy the code

The doBeginRead method is implemented in AbstractNioChannel, where the interest event of selectionKey is modified, and the existing interest event interestOps and readInterestOp are reconfigured.

InterestOps is an existing interest event, which is in the code javaChannel().register(eventLoop().selector, 0, this) when registering the Selector.

So where did you set up readInterestOp, remember in the creating a Channel section of this article?

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
Copy the code

When NioServerSocketChannel calls the constructor of the parent class, it passes an interest event parameter with a value of SelectionKey.op_accept.

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code

Nine series, we finally found such a small point, why the process is so long, it seems difficult to understand, it does not matter, continue to pay attention to my article, let’s talk about PipeLine will be clear here.

3 summary

Netty server startup process:

  1. Creating a Channel instance makes the Channel non-blocking, creating a PipeLine and an Unsafe instance for the Channel.

  2. Initialize the Channel, set parameters and properties for the Channel, and add the special Handler ServerBootstrapAcceptor.

  3. Register the Channel, bind an EventLoop to the Channel, and register the Channel with the Selector.

  4. Bond port.


About the author

Wang Jianxin, senior Java engineer of architecture department, mainly responsible for service governance, RPC framework, distributed call tracking, monitoring system, etc. Love technology, love learning, welcome to contact and exchange.

Original article, code word is not easy, don’t forget to like.