Author: LemonNan

preface

It has been over a year since we last wrote the Netty source code, and the current version of the Netty source code is 4.1.31, which is not exactly the same as the original version. Smaller versions of the source code do not differ much in the main flow. This analysis of the source is to use Netty to start a server process.

The process that

Io.net ty.example. Echo: io.netty.example. Echo: io.netty.example. Echo: io.netty.example.

Demo

      	// Configure the server.
        // Thread pool initialization
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception{
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler); }});// Start the server.
            // Wait for startup to complete
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            // Wait to close
            f.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
Copy the code

This is to start a simple I/O server, the original code has an SSL configuration, removed here.

Before parsing the above server-side startup code, let’s recall the logic of writing server-side code in NIO.

NIO process

// These are the steps
/ / 1. To create
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
/ / 2. Binding
serverSocketChannel.bind(new InetSocketAddress(port));
// 3. Register events
this.selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
Copy the code

In general, the NIO process is the following three steps:

  • Create ServerSockerChannel and set it to non-blocking (because it is blocked by default) (Create ServerSocketChannel)
  • Bind port
  • Register events of interest

After knowing the process written with NIO before, analyze the Echo demo given by Netty.

The code before ChannelFuture f = b.bind(PORT).sync() simply initializes some server parameters with a chain and waits until bind is ready to start.

Next, look at the bind method.

bind

So first of all, when YOU bind, you pass in a port number, which you’re probably familiar with.

public ChannelFuture bind(int inetPort) {
  return bind(new InetSocketAddress(inetPort));
}
Copy the code

And then we call

public ChannelFuture bind(SocketAddress localAddress) {
 	  // Check some initialization parameters
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}
Copy the code

validate()

Parameter verification method validate()

public B validate(a) {
    if (group == null) {
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return self();
}
Copy the code

First, check the boss thread pool, then check the initialization of the channel factory, which corresponds to the b.group(bossGroup, workerGroup) and What is it?

Continue to see, it is a Channel factory, should be to create the Channel, at the time of initialization parameters, also have code to set the Channel information Channel (NioServerSocketChannel. Class)

The channel code (Class)

public B channel(Class<? extends C> channelClass) {
  if (channelClass == null) {
    throw new NullPointerException("channelClass");
  }
  return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
Copy the code

Finally, call the following method

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
  if (channelFactory == null) {
    throw new NullPointerException("channelFactory");
  }
  if (this.channelFactory ! =null) {
    throw new IllegalStateException("channelFactory set already");
  }

  this.channelFactory = channelFactory;
  return self();
}
Copy the code

As you can see here, the channelFactory is a ReflectiveChannelFactory.

Next comes the doBind(localAddress) method called in bind

doBind

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if(regFuture.cause() ! =null) {
            return regFuture;
        }
				/ /... DoBind0 () is called after the next bunch of operations, which is simplified by doBind0
        doBind0(regFuture, channel, localAddress, promise);
    }
Copy the code

In order to make the code easier to read, some of the middle logical code processing is omitted.

Initialize and register initAndRegister() to determine if it is abnormal, and then call doBind0.

initAndRegister

final ChannelFuture initAndRegister(a) {
  		channel = channelFactory.newChannel();
  		init(channel);
  		ChannelFuture regFuture = config().group().register(channel);
  		return regFuture;
}
Copy the code

Here, too, a lot of code is omitted to extract the main flow.

  • Create ServerChannel channelFactory. NewChannel ()
  • Init (channel) initialization
  • Call registration method

Create a channel

Here is to use channelFactory. NewChannel () for channel creation, already know from the above code, the Factory is ReflectiveChannelFactory, Look at its newChannel() method:

	@Override
	public T newChannel(a) {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class "+ clazz, t); }}Copy the code

Here is according to the class using reflection to create the Channel that we initially incoming NioServerSocketChannel. Class

Init initialization

ServerBootstrap init method

void init(Channel channel) throws Exception
Copy the code

Personally feel this method, there seems to be nothing good to say..

Registration method

Register calls SingleThreadEventLoop’s register method, and finally calls AbstractChannel’s register(EventLoop, ChannelPromise) -> register0(ChannelPromise) -> AbstractNioChannel#doRegister()

In the doRegister() method, the Java NIO registration is finally called

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
Copy the code

At this point, registration and initialization are complete, and the subsequent doBind0() is called

There is a problem with the process, which is some of its processing when the doRegister() method is finally called.

private void register0(ChannelPromise promise) {
	doRegister();
	pipeline.fireChannelRegistered();
	if (firstRegistration) {
		pipeline.fireChannelActive();
	}else{ beginRead(); }}Copy the code
  • Perform the Java NIO registration
  • Registered event that calls the chain, which is an inbound event
  • Call the active event, which is also the inbound event, and the same channel will only trigger one active event, the same channel will call the deregister event and register again, will not trigger multiple active events. But registered fires multiple times.

doBind0()

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
Copy the code

AbstractChannel#bind -> pipeline#bind -> tail.bind -> invokeBind

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
          // Finally switch to here,
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}else{ bind(localAddress, promise); }}Copy the code

If you have a custom Handle that implements the ChannelOutboundHandler interface, the final event is passed to the custom Handle’s bind method.

After all of this Handler’s bind is called, you will eventually call the bind method of the outermost HeadContext that Netty automatically added for you because it is both inbound and outbound.

public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}
Copy the code

HeadContext bind method – > AbstractChannel# bind – > NioServerSocketChannel. DoBind (this is one of the subclass implementation).

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else{ javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code

Ultimately, it’s the bind method of the javaChannel() call.

The process of channel creation, event registration, and binding ends here, and it’s about another thing in Netty: the Future.

sync()

Remember the sync() method after bind above?

ChannelFuture f = b.bind(PORT).sync();

Sync () blocks the current thread until the server starts up, which depends on the Future. The implementation of this block is relatively simple, using a simple loop determination. The final call code is as follows:

    @Override
    public Promise<V> await(a) throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock();
        // Wait indefinitely until done
        synchronized (this) {
            while(! isDone()) { incWaiters();try {
                    wait();
                } finally{ decWaiters(); }}}return this;
    }
Copy the code

If sync() is called, the final call to DefaultPromise#await() will block the current call to the thread. DefaultPromise stores the result of the execution, which is volatile. So the other thread has changed the result, and you can see that from the beginning of the bind call all the way through, and I’m sure you can see that Promise is passing, in order to set the result into the Promise, and then change it to succeed or fail, and all the other threads can see that.

Here is a source code interpretation of the Future/Promise I wrote earlier

Netty asynchronous Future source code interpretation

The last

This article on the Netty server startup process ends here.

Last year,

  • Netty asynchronous Future source code interpretation
  • Netty high-performance ByteBuf source code parsing
  • Netty usage and event passing