Netty source code analysis series

  • Netty source code parsing series – server start process parsing
  • Netty source code parsing series – client connection access and read I/O parsing
  • 5 minutes to understand pipeline model -Netty source code parsing

1. Server startup example (based on 4.0.31.final)

   public class Server {
    private ServerBootstrap serverBootstrap;
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workGroup;

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Service startup");
        Server server = new Server();
        server.start();
    }
    private void start() throws InterruptedException {
       try {
             serverBootstrap=new ServerBootstrap();
             bossGroup = new NioEventLoopGroup();
             workGroup = new NioEventLoopGroup(4);
             serverBootstrap.group(bossGroup, workGroup)
                               .channel(NioServerSocketChannel.class)
                               .option(ChannelOption.SO_BACKLOG, 128)
                               .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                               .childOption(ChannelOption.SO_KEEPALIVE, true)
			        .handler(new InitHandler())
                               .childHandler(new IOChannelInitialize());
             ChannelFuture future = serverBootstrap.bind(8802).sync();
             future.channel().closeFuture().sync();
          } finally {
                 bossGroup.shutdownGracefully();
                 workGroup.shutdownGracefully();
          }

}

    private class IOChannelInitialize extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("initChannel"); ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0)); ch.pipeline().addLast(new IOHandler()); }}}Copy the code

Step-by-step instructions

  • 1.1 Creating ServerBootstrap Instance, a netty bootstrap helper class that provides a series of methods for setting server boot-related parameters. The bottom layer abstracts and encapsulates various capabilities through the facade mode, so that users do not need to deal with too many bottom API as far as possible, reducing the difficulty of user development

  • 1.2 NioEventLoopGroup is the Netty Reactor thread pool. BossGroup monitors the connection between the ACCEPT client and workGroup processes the I/O, codec

  • 1.3 binding server NioServerSocketChannel

  • 1.4 Setting Some Parameters

  • 1.5 Initialize pipeline and bind handler. A pipeline is a chain of responsibilities responsible for handling network events, managing and executing ChannelHandler, and setting up the IdleStateHandler and custom IOHandler provided by the system

  • 1.6 serverBootstrap.bind(8802) This is where the server binding port is started

  • 1.7 future. Channel (.) closeFuture (). The sync (); Wait for the server to shut down

  • 1.8 Graceful Closing

2. Source code analysis

2.1 NioEventLoopGroup

NioEventLoopGroup is not only an I/O thread, but also responsible for system tasks and scheduled tasks in addition to I/O reading and writing

     public NioEventLoopGroup(int nThreads) {
           this(nThreads, null);
       }
Copy the code
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
   this(nThreads, threadFactory, SelectorProvider.provider());
}
Copy the code
public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    super(nThreads, threadFactory, selectorProvider);
}
Copy the code
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
Copy the code

Moving on, here’s the simplified code

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
			...
	if (threadFactory == null) {
    		threadFactory = newDefaultThreadFactory();
	}
	children = new SingleThreadEventExecutor[nThreads];
	if (isPowerOfTwo(children.length)) {
    		chooser = new PowerOfTwoEventExecutorChooser();
	} else {
    		chooser = new GenericEventExecutorChooser();
	}
	for(int i = 0; i < nThreads; i ++) { ... children[i] = newChild(threadFactory, args); . }Copy the code

MultithreadEventExecutorGroup realized thread thread creation and choice, we see the newChild method (NioEventLoopGroup class method), newChild threads instantiated

@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
Copy the code

A NioEventLoop is created

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}
Copy the code

Along with the super

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    super(parent, threadFactory, addTaskWakesUp);
}
Copy the code

Code has been streamlined. Go ahead

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    thread = threadFactory.newThread(new Runnable() {
        	@Override
        	public void run() { SingleThreadEventExecutor.this.run(); }}});Copy the code

Instantiate a thread here, and in the run call SingleThreadEventExecutor run method, where the thread is started, we continue to look down to summarize: NioEventLoopGroup is actually a Reactor thread pool, which schedules and executes client access, network read and write events, user-defined tasks, and scheduled tasks.

2.2 ServerBootstrap

ServerBootstrap is the server startup helper class. The parent class is AbstractBootstrap, and the corresponding client startup helper class is Bootstrap

    public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
		volatile EventLoopGroup group;
		private volatile ChannelFactory<? extends C> channelFactory;
		private volatile SocketAddress localAddress; private final Map<ChannelOption<? >, Object> options = new LinkedHashMap<ChannelOption<? >, Object>(); private final Map<AttributeKey<? >, Object> attrs = new LinkedHashMap<AttributeKey<? >, Object>(); private volatile ChannelHandler handler; }Copy the code

2.2.1 Setting the BooSS and Work thread Pools

BossGroup is passed to the parent class, and workGroup is assigned to serverBootstrap’s childGroup

      public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    		super.group(parentGroup);
    		if (childGroup == null) {
        		throw new NullPointerException("childGroup");
    		}
    		if(this.childGroup ! = null) { throw new IllegalStateException("childGroup set already");
    		}
    		this.childGroup = childGroup;
    		return this;
}
Copy the code

2.2.2 setting NioServerSocketChannel to process connection requests

serverBootstrap.channel(NioServerSocketChannel.class) 
Copy the code
   public B channel(Class<? extends C> channelClass) {
    			if (channelClass == null) {
        			throw new NullPointerException("channelClass");
   			 }
   			 return channelFactory(new BootstrapChannelFactory<C>(channelClass));
   }
Copy the code

Continue with new BootstrapChannelFactory

      private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
    			private final Class<? extends T> clazz;
    			BootstrapChannelFactory(Class<? extends T> clazz) {
        			this.clazz = clazz;
    			}
    			@Override
			public T newChannel() {
        			try {
            				return clazz.newInstance();
        			} catch (Throwable t) {
            				throw new ChannelException("Unable to create Channel from class "+ clazz, t); }}}Copy the code

BootstrapChannelFactory is an inner class that inherits ChannelFactory. As the name suggests, it is a channel factory class that overrides the parent’s newChannel method. Create an instance of NioServerSocketChannel with reflection, and it’ll tell you where it was called, right

2.2.3 Setting the value of channel Channel block

         serverBootstrap.option(ChannelOption.SO_BACKLOG, 128) 
Copy the code
          public <T> B option(ChannelOption<T> option, T value) {
    			if (option == null) {
        			throw new NullPointerException("option");
    			}
    			if(value == null) { synchronized (options) { options.remove(option); }}else{ synchronized (options) { options.put(option, value); }}return (B) this;
          }
Copy the code

The option method here is the parent class AbstractBootstrap, the method of the options is an orderly not thread safe two-way linked list, add lock

2.2.4 serverBootstrap.childOption

		public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    			if (childOption == null) {
       				 throw new NullPointerException("childOption");
    			}
    			if(value == null) { synchronized (childOptions) { childOptions.remove(childOption); }}else{ synchronized (childOptions) { childOptions.put(childOption, value); }}return this;
		}
Copy the code

ServerBootstrap childOption is a method subclass serverBootstrap. Set ServerChannel’s options childOption: The main set of options for ServerChannel’s subchannels is option for the boss thread and childOption for the Work thread pool

2.2.5 setting the server NioServerSocketChannel Handler

         serverBootstrap.handler(new InitHandler())
Copy the code
         public B handler(ChannelHandler handler) {
    			if (handler == null) {
        			throw new NullPointerException("handler");
    			}
    			this.handler = handler;
    			return (B) this;
         }
Copy the code

2.2.6 serverBootstrap. ChildHandler ()

           public ServerBootstrap childHandler(ChannelHandler childHandler) {
    			   if (childHandler == null) {
        				throw new NullPointerException("childHandler");
    			   }
       			   this.childHandler = childHandler;
    			   return this;
               
           }
Copy the code

Handler belongs to the server NioServerSocketChannel, it’s only created once. ChildHandler belongs to every new NioSocketChannel, This is called whenever a connection comes up

2.2.7 The actual startup process is performed here, let’s look at the bind() method

         serverBootstrap.bind(8802).sync() 
Copy the code
         public ChannelFuture bind(int inetPort) {
    			return bind(new InetSocketAddress(inetPort));
		}
Copy the code
  • (1)Create one by port numberInetSocketAddressAnd continue tobind
 public ChannelFuture bind(SocketAddress localAddress) {
    			validate();
    			if (localAddress == null) {
        			throw new NullPointerException("localAddress");
    			}
    			return doBind(localAddress);
 }
Copy the code
  • (2) validate()Method to perform some parameter verification, we directly look atdoBind()
        private ChannelFuture doBind(final SocketAddress localAddress) {
    			final ChannelFuture regFuture = initAndRegister();
    			final Channel channel = regFuture.channel();
			if(regFuture.cause() ! = null) {return regFuture;
			}
			if (regFuture.isDone()) {
    				ChannelPromise promise = channel.newPromise();
    				doBind0(regFuture, channel, localAddress, promise);
    				return promise;
			} else {
				final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
				regFuture.addListener(new ChannelFutureListener() {
    					@Override
    					public void operationComplete(ChannelFuture future) throws Exception {
        					Throwable cause = future.cause();
        					if(cause ! = null) { promise.setFailure(cause); }else {
                    					promise.executor = channel.eventLoop();
                				}
                					doBind0(regFuture, channel, localAddress, promise); }});returnpromise; }}Copy the code
  • (3.1)See firstinitAndRegister ( AbstractBootstrapClass), leaving out some unimportant ones
			final ChannelFuture initAndRegister() {
    				final Channel channel = channelFactory().newChannel();
        			init(channel);
				ChannelFuture regFuture = group().register(channel);
				return regFuture;
			}
Copy the code

ChannelFactory is created at serverBootstrap.channel(), where reflection is called to create an instance of NioServerSocketChannel

  • (3.2.1)Take a look atinit(channel)Methods (ServerBootstrapClass)
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<? >, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<? >, Object> attrs = attrs(); synchronized (attrs) {for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<? >, Object>[] currentChildOptions; final Entry<AttributeKey<? >, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler();if (handler != null) {
                	pipeline.addLast(handler);
            	}
            	pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }	
    });
}
Copy the code

Options () is an AbstractBootstrap class assigned by ServerBootstrap.option (). P.addlast () adds a new handler to NioServerSocketChannel, where pipeline is like a Servlet filter, Managing all handlers

  • (3.2.2)Take a look atgroup().register()methods

    Here,groupBossGroup (NioEventLoopGroup – spending MultithreadEventLoopGroup), multiple jumps toSingleThreadEventLoopOf the classregister()methods
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}
Copy the code
  • (3.2.3) Clear out some unimportant code, below is the real registration
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
       try {
    	    eventLoop.execute(new OneTimeTask() {
        	@Override
        	public void run() { register0(promise); }}); } catch (Throwable t) { } } }Copy the code

Eventloop.ineventloop () is used to check whether the starting thread and the current thread are the same. If they are the same, they are started. If they are different, they are not started or the thread is different.

  • (3.2.4)The thread hasn’t started yet. Let’s goeventLoop.execute()theexecute()Method isSingleThreadEventExecutorOf the class
@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if(isShutdown() && removeTask(task)) { reject(); }}if(! addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }}Copy the code
  • (3.2.5) Start thread
private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if(STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { thread.start(); }}}Copy the code

We in the beginning of 2.1 SingleThreadEventExecutor thread within the constructor is start here, we go back to 2.1

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    thread = threadFactory.newThread(new Runnable() {
        	@Override
        	public void run() { SingleThreadEventExecutor.this.run(); }}});Copy the code
  • (3.2.6)Open theSingleThreadEventExecutor.this.run() ;
          @Override
	protected void run() {
    		for (;;) {
        		boolean oldWakenUp = wakenUp.getAndSet(false);
        		try {
            			if (hasTasks()) {
                			selectNow();
            			} else {
                			select(oldWakenUp);
   					if (wakenUp.get()) {
        					selector.wakeup();
    					}
				}
			cancelledKeys = 0;
			needsToSelectAgain = false;
			final int ioRatio = this.ioRatio;
			if (ioRatio == 100) {
   				processSelectedKeys();
    				runAllTasks();
			} else {
    				final long ioStartTime = System.nanoTime();
    				processSelectedKeys();
    				final long ioTime = System.nanoTime() - ioStartTime;
    				runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
			}
			if (isShuttingDown()) {
    				closeAll();
    				if (confirmShutdown()) {
        				break;
    				}
			}
		} catch (Throwable t) {
            		try {
                		Thread.sleep(1000);
            		} catch (InterruptedException e) {
            		}
        	 }
    	   }
	  }
Copy the code

This is done asynchronously, polling the SELECT client for Accept, and runAllTasks for all tasks

  • (3.3)We’ll see(3.1)The inside of theChannelFuture regFuture = group().register(channel);Jump toSingleThreadEventLoopregistermethods
@Override
public ChannelFuture register(Channel channel) {
	...
   	channel.unsafe().register(this, promise);
	return promise;
}
Copy the code

Here is the simplified code (AbstractUnsafe inner class in the AbstractChannel class)

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
	...
	eventLoop.execute(new OneTimeTask() {
    	@Override
    	public void run() { register0(promise); }}); . }Copy the code
private void register0(ChannelPromise promise) {
	...
	doRegister(); .if(firstRegistration && isActive()) { pipeline.fireChannelActive(); }... }Copy the code

Continue (in AbstractNioChannel class)

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for(;;) {... selectionKey = javaChannel().register(eventLoop().selector, 0, this); . }}Copy the code

Register NioServerSocketChannel with the Selector of the boss thread pool NioEventLoop. Here we should register OP_ACCEPT(16) to register 0 on the multiplexer: (1) The registration method is polymorphic, which can be used by NioServerSocketChannel to listen for client connection access. The interestOps(int OPS) method of the SelectionKey allows you to easily modify the interestOps bit

Look at pipeline. FireChannelActive ()

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

Copy the code
@Override
public Channel read() {
    pipeline.read();
    return this;
}
Copy the code
@Override
public ChannelPipeline read() {
    tail.read();
    return this;
}
Copy the code
@Override
public ChannelHandlerContext read() {... next.invokeRead(); . }Copy the code
private void invokeRead() { try { ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); }}Copy the code

Go into the read of HeadContext

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
Copy the code

@Override public final void beginRead() { … doBeginRead(); . }

@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
    return;
}

final SelectionKey selectionKey = this.selectionKey;
if(! selectionKey.isValid()) {return;
}
readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code

Finally, change the selectionKey’s listening bit to OP_READ here

  • (4)Take a look atdoBind0( )methods
 private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code

The method is put into the REACTOR thread pool task queue, which determines whether the registration is successful, and then the bind method is continued

  • (5)performbind( )methods
      @Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
Copy the code
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
Copy the code
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { ... final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); . next.invokeBind(localAddress, promise); . }Copy the code

Since the bind event is an outbound event, look for an outbound handler and execute the invokeBind() method

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); }}Copy the code
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}
Copy the code
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
	...
	doBind(localAddress); . }Copy the code
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}
Copy the code

ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap The reactor thread pool, the server Channel, and some configuration parameters are configured. The client is connected to the handler. Register OP_ACCEPT to the multiplexer (3) start reactor thread pool, listen for connections continuously, process task (4) bind port