preface

The last article looked at the various core components in Netty. How do these core components work together

Server startup code

Let’s start with a bit of server startup code

public class NettyServer { public static void main(String[] args) throws InterruptedException { // 1. Create two thread groups: bossGroup and workerGroup // 2. BossGroup only handles connection requests. The real and client business processing will be assigned to workerGroup // 3. BossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap = new ServerBootstrap(); ServerBootstrap = new ServerBootstrap(); // Set two thread groups bootstrap.group(bossGroup, WorkerGroup) / / using NioServerSocketChannel as the server channel implementation. The channel (NioServerSocketChannel. Class) / / set the thread queue connection number .option(channeloption.so_backlog, 128) // Set to keep active connections. ChildOption (channeloption.so_keepalive, ChildHandler (new ChannelInitializer<SocketChannel>() {@override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); }}); System.out.println("server is ready"); ChannelFuture cf = bootstrap.bind(6668).sync(); ChannelFuture cf = bootstrap.bind(6668).sync() Cf.channel ().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

Setting server parameters

bootstrap.group(bossGroup, workerGroup)
Copy the code
public ServerBootstrap group(EventLoopGroup parentGroup, Class AbstractBootstrap super.group(parentGroup); if (this.childGroup ! = null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; }Copy the code

The bootstrap.group method mainly assigns values to properties. In AbstractBootstrap, there are member variables of group (bossGroup) and ServerBootstrap member variables of childGroup (workerGroup). These two groups are a bit like our daily work. BossGroup is the boss who desperately receives work from outside (constantly receiving new connections), while childGroup is the worker who desperately works overtime (handling business operations of connected clients).

bootstrap.channel(NioServerSocketChannel.class)
Copy the code

Bootstrap. channel is to set up a customer service specialist. Whenever there is a new connection request, a new channel will be created, through which the client and the server can communicate with each other

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
      ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}
Copy the code
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Constructor<? extends T> constructor; Public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); @override public T newChannel() {try {return constructor.newinstance (); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } @Override public String toString() { return StringUtil.simpleClassName(ReflectiveChannelFactory.class) + '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)"; }}Copy the code

View the source code, find the bootstrap. Channel (NioServerSocketChannel. Class) use the factory pattern, subsequent use newChannel method can be constructed by reflecting way NioServerSocketChannel instance

bootstrap.option(ChannelOption.SO_BACKLOG, 128)
         .childOption(ChannelOption.SO_KEEPALIVE, true)
Copy the code

The option() method sets variable parameters. Option is set in the parent class AbstractBootstrap. ChildOption is set in the ServerBootstrap class

public <T> B option(ChannelOption<T> option, T value) {
    ObjectUtil.checkNotNull(option, "option");
    synchronized (options) {
      if (value == null) {
        options.remove(option);
      } else {
        options.put(option, value);
      }
    }
    return self();
}
​
​
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    ObjectUtil.checkNotNull(childOption, "childOption");
    synchronized (childOptions) {
      if (value == null) {
        childOptions.remove(childOption);
      } else {
        childOptions.put(childOption, value);
      }
    }
    return this;
}
Copy the code
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); }});Copy the code

This is also the place to set the parameters in ServerBootstrap

doBind

As we all know, NIO has three components: Buffer, Channel, and Selector. Netty wraps NIO to make it easier to use, so NIO’s operation flow should be similar to Netty’s theoretically. In NIO, channels need to be registered with Selector. In the last article, I wrote that during initialization of a NioEventLoopGroup, each NioEventLoop creates a Selector, and during initialization of a NioServerSocketChannel, I’m going to create a Channel and a Channel pipeline, so how does a Channel and a Selector connect together, and the trick is in the bind method, the bind method, the doBind method that’s really doing the work

Private ChannelFuture doBind(final SocketAddress localAddress) {// Create a Channel object This is an asynchronous operation final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); If (regfuture.cause ()! = null) { return regFuture; If (regfuture.isdone ()) {// At this point we know that the registration was complete and  successful. ChannelPromise promise = channel.newPromise(); // doBind doBind0(regFuture, channel, localAddress, Promise); return promise; } else {// If the initAndRegister method is not completed, add a listener, wait for the completion of the callback below the method, the idea is the same as the above code to determine whether an exception is generated, If there is no exception for doBind operation final PendingRegistrationPromise promise = new PendingRegistrationPromise (channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws  Exception { Throwable cause = future.cause(); if (cause ! = null) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}}); return promise; }}Copy the code

InitAndRegister () is an asynchronous method, and an EventLoop is bound to only one Thread during its lifetime. A Channel is actually registered with the Selector of that EventLoop. The registration action can only be performed by the Thread bound to the EventLoop (why? If every thread can perform this operation, then the problem of multithreading arises. Netty needs to consider various locks to solve the problem of multithreading. EventLoop (); execute(); execute(); eventLoop(); execute(); In the doBind method, initAndRegister() is an asynchronous method, and the async execution of the method is determined later. If complete, the doBind0 method is executed directly, if not, it waits for a listener callback

final ChannelFuture initAndRegister() { Channel channel = null; Try {/ / above the bootstrap. Channel (NioServerSocketChannel. Class) set parameters will be set when ReflectiveChannelFactory / / what kind of object, specializing in the production of here is the place where the specific use to the, Using the reflection of the way a NioServerSocketChannel object instance channel = channelFactory. NewChannel (); Init (Channel); } catch (Throwable t) { ...... } ChannelFuture regFuture = config().group().register(Channel); if (regFuture.cause() ! = null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }Copy the code

Take a closer look at the NioServerSocketChannel initialization process, which has some key Settings

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); / / set to non-blocking serverSocketChannel. ConfigureBlocking (false); / / channel registration to the selector, concerned about the events for OP_ACCEPT serverSocketChannel. Register (selector, SelectionKey. OP_ACCEPT);Copy the code

In Netty, the setting is in the parent class of NioServerSocketChannel. During NioServerSocketChannel initialization, the parent class is initialized layer by layer. Let’s first look at the inheritance diagram for this class

Protected AbstractNioChannel(Channel parent, SelectableChannel, int readInterestOp) {super(parent); this.ch = ch; OP_ACCEPT this. ReadInterestOp = readInterestOp; Try {// Set ch.configureblocking (false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); } } protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); Pipeline = newChannelPipeline(); // Channel = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }Copy the code

Going back to the initAndRegister() method, what does init() do

@override void init(Channel Channel) {// Set the configured option parameter setChannelOptions(Channel, newOptionsArray(), Logger); setAttributes(channel, newAttributesArray()); // NioServerSocketChannel initialization, ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<? >, Object>[] currentChildOptions = newOptionsArray(childOptions); final Entry<AttributeKey<? >, Object>[] currentChildAttrs = newAttributesArray(childAttrs); // When NioServerSocketChannel is registered with EventLoop, This method is called p.addlast (new ChannelInitializer<Channel>() {@override public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); // If handler is not null, add it to if (handler! = null) { pipeline.addLast(handler); } // Add ServerBootstrapAcceptor when pipeline adds ServerBootstrapAcceptor // To ensure that the add operation is performed in the EventLoop thread, The add operation is wrapped as a task submitted to the EventLoop thread. Ch.eventloop ().execute(new Runnable() {@override public void run() {pipeline.addlast (new) ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code

Go back to the initAndRegister() method and continue with the registration operation

Override public ChannelFuture Register (Channel Channel) {Override public ChannelFuture Register (Channel Channel) {Override public ChannelFuture Register (Channel Channel) {Override public ChannelFuture Register (Channel Channel) {Override public ChannelFuture Register (Channel Channel) { DefaultChannelPromise implements ChannelPromise and has // ChannelFuture and Promise features. Return register(new DefaultChannelPromise(channel, this)); }Copy the code
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); // If the Channel is registered, If (isRegistered()) {promise.setFailure(new IllegalStateException(" Registered to an event loop Already ")); return; } if (! isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; // An EventLoop is bound to only one Thread for its entire life cycle. This check is used to determine if the current Thread is bound to the EventLoop. If (eventloop.ineventLoop ()) {// Register register0(Promise); } else {try {// If the current thread is not a thread bound to EventLoop, Eventloop.execute (new Runnable() {@override public void run() {Override public void run() { register0(promise); }}); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop:  {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}}Copy the code
Private void register0(ChannelPromise Promise) {try {// Check whether the channel is open if (! promise.setUncancellable() || ! ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; DoRegister (); // Set the identifier bit to prevent repeated registration neverRegistered = false; registered = true; / / here is executed before the init () method to add into the Channel of initialization Handler pipeline. InvokeHandlerAddedIfNeeded (); SafeSetSuccess (promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) {try {// register a Channel with Selector // Note: SelectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (! selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; }}}}Copy the code

At this point, the Channel is finally registered with the Selector, but it’s set to focus on event 0, and in NIO code, it’s set to focus on event selectionKey.op_accept, where is that set? Let’s move on

After looking at initAndRegister(), we’re going to step back into the doBind method, which has a key method, doBind0

@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); . boolean wasActive = isActive(); Try {// doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (! WasActive &&isActive ()) {invokeLater(new Runnable() {@override public void run() {// To change the attention event pipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }Copy the code

Because the code’s call chain is very long, let’s go straight to the key code

@override protected void doBind(SocketAddress localAddress) throws Exception {// Check whether the Java version is greater than 7 if (PlatformDependent. JavaVersion () > = 7) {/ / bind port javaChannel (). The bind (localAddress, config. GetBacklog ()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code
@Override public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); // go to readIfIsAutoRead(); } @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called  final SelectionKey selectionKey = this.selectionKey; if (! selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); If ((interestOps & readInterestOp) = = 0) {/ / here complete attention event change selectionKey. InterestOps (interestOps | readInterestOp); }}Copy the code

So far, a few lines of code that we used to write in NIO are all present in Netty, which, I have to say, is very well wrapped.

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); Selector selector = Selector.open(); / / listen on port 6666 serverSocketChannel. Socket (), bind (new InetSocketAddress (6666)); serverSocketChannel.configureBlocking(false); / / channel registration to the selector, concerned about the events for OP_ACCEPT serverSocketChannel. Register (selector, SelectionKey. OP_ACCEPT);Copy the code

The resources

www.jianshu.com/p/47fbe16ec…