Netty threading model

Netty implements the Reactor thread model, which has four parts:

  1. Resources: Resources, tasks, are requests from clients
  2. Synchronous event multiplexer: Event polling, where the selector of the boss thread polls for the client’s events
  3. The boss thread will assign the client request to the worker thread for I/O processing
  4. Request handler, which handles client I/O requests

Code sample

 	static final int PORT = Integer.parseInt(System.getProperty("port"."8099"));

	public static void main(String[] args) {
	     / / create EventLoopGroup
	     EventLoopGroup bossGroup = new NioEventLoopGroup(1);
	     EventLoopGroup workerGroup = new NioEventLoopGroup();
	     final EchoServerHandler handler = new EchoServerHandler();
	     try {
	         // Create an initiator
	         ServerBootstrap bootstrap = new ServerBootstrap();
	         // Configure the initiator
	         bootstrap.group(bossGroup, workerGroup)
	                 .channel(NioServerSocketChannel.class)
	                 .option(ChannelOption.SO_BACKLOG, 100)
	                 .childHandler(new ChannelInitializer<SocketChannel>() {
	                     @Override
	                     protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(handler); }});// Bind ports to start
	         ChannelFuture f = bootstrap.bind(PORT).sync();
	         // Close the initiator
	         f.channel().closeFuture().sync();
	     } catch (InterruptedException e) {
	         e.printStackTrace();
	     } finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}Copy the code

NioEventLoopGroup initialization process

== New a NioEventLoopGroup

EventLoopGroup workerGroup = new NioEventLoopGroup()
Copy the code

NioEventLoopGroup constructor (NioEventLoopGroup constructor)

public NioEventLoopGroup(a) {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor)null);
}

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()});
}
Copy the code

NThreads: The number of threads to be created. NThreads has the value 0 if the argument constructor is used, and nThreads has the value Executor passed in: Can define your own, if he defines, behind is not initialized, if not defined, the default is null, will be initialized in the rear SelectorProvider: through SelectorProvider. The provider () to create; SelectorProvider is just to create DatagramChannel, Pipe, Selector, ServerSocketChannel, SocketChannel, System. InheritedChannel () selectStrategyFactory: Factory RejectedExecutionHandlers DefaultSelectStrategyFactory INSTANCE, selection strategy. Reject () : A thread pool reject policy that is executed when adding tasks to the thread pool if the pool task is already full

Step 3: = = into MultithreadEventLoopGroup class This is the step of the parent class constructor, Can see selectorProvider selectStrategyFactory, RejectedExecutionHandlers. Reject () in the Object… Args is in the array. One thing this constructor does is use the default DEFAULT_EVENT_LOOP_THREADS if the previous nThreads pass is 0, which is twice the number of CPU cores, or the number of threads passed if the previous nThreads pass is not 0.

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code

Step 4: = = into MultithreadEventExecutorGroup class DefaultEventExecutorChooserFactory INSTANCE, is the event to do factory, Through the new DefaultEventExecutorChooserFactory () object is created

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code

It will then enter the following method. Because this part of the code is quite long, the following source code has been deleted:

 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
        this.terminatedChildren = new AtomicInteger();
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        } else {
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
            }

            this.children = new EventExecutor[nThreads];

            int j;
            for(int i = 0; i < nThreads; ++i) {
                boolean success = false;
                boolean var18 = false;

                try {
                    var18 = true;
                    this.children[i] = this.newChild((Executor)executor, args);
                    success = true;
                    var18 = false;
                } catch (Exception var19) {
                    throw new IllegalStateException("failed to create a child event loop", var19);
                } finally{}this.chooser = chooserFactory.newChooser(this.children); }}Copy the code

Enclosing newDefaultThreadFactory () will create a thread factory, its function is used to create a thread; New ThreadPerTaskExecutor creates a thread executor; this.children = new EventExecutor[nThreads]; Create an array children, nThreads, NioEventLoop; this.children[i] = this.newChild((Executor)executor, args); Create a NioEventLoop through a loop; this.chooser = chooserFactory.newChooser(this.children); Creates a selector for the thread executor

1: newDefaultThreadFactory

protected ThreadFactory newDefaultThreadFactory(a) {
   return new DefaultThreadFactory(this.getClass());
}
Copy the code

NewDefaultThreadFactory implements the ThreadFactory interface. When ThreadFactory’s new Thread() is called, a Thread is created and given the name nioEventloop-x-x, The first x indicates which thread group, such as bossGroup or workerGroup, and the second X indicates the serial number of the thread under the current thread group.

2: ThreadPerTaskExecutor

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
   if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    } else {
        this.threadFactory = threadFactory; }}public void execute(Runnable command) {
    this.threadFactory.newThread(command).start();
}
Copy the code

ThreadPerTaskExecutor implements the Executor interface. When its execute method is called, a thread is created and started, and only one thread is created for each NioEventLoop.

3: Create one NioEventLoop after another

 private final EventExecutor[] children;
 private final EventExecutorChooser chooser;
 
 this.children = new EventExecutor[nThreads];
 int j;
 for(int i = 0; i < nThreads; ++i) {
       boolean success = false;
       boolean var18 = false;

       try {
           var18 = true;
           this.children[i] = this.newChild((Executor)executor, args);
           success = true;
           var18 = false;
       } catch (Exception var19) {
           throw new IllegalStateException("failed to create a child event loop", var19);
       } finally{}this.chooser = chooserFactory.newChooser(this.children);
}
Copy the code

Create an EventExecutor array whose size is the nThreads value passed in. Inside the array are nioEventLoops. Initialize each NioEventLoop in the array:

 protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
    }
Copy the code

NewChild (NioEventLoopGroup) returns NioEventLoop (NioEventLoop constructor);

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        } else if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        } else {
            this.provider = selectorProvider;
            NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
            this.selector = selectorTuple.selector;
            this.unwrappedSelector = selectorTuple.unwrappedSelector;
            this.selectStrategy = strategy; }}Copy the code

SelectorTuple. Selector creates a netty modified selector, whereby the multiplexing selector calls its parent class again, tracing it to see:

 protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
Copy the code

In SingleThreadEventExecutor again:

 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.threadLock = new Semaphore(0);
        this.shutdownHooks = new LinkedHashSet();
        this.state = 1;
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = (Executor)ObjectUtil.checkNotNull(executor, "executor");
        this.taskQueue = this.newTaskQueue(this.maxPendingTasks);
        this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
Copy the code

Create two queues: tailTasks and taskQueue. The taskQueue is the queue that holds the tasks and the tailTasks are the queue that holds the end work

4: ChooserFactory. NewChooser (enclosing the children) thread actuator selector, there is a group of NioEventLoop in NioEventLoopGroup, namely has a set of threads, when a client connect to come over, need I/O operations on it, The selector of the thread executor does just that: polling. When a client comes, it fetches the first thread in the thread group. When another client comes, it fetches the second thread in the thread group. For each client channel, first obtain the value of the counter, and then use the value of the counter to take the module of the array, and then add the counter one, when the number of threads is 2 integer power, Netty will use the bit operation to take the module operation; When the number of threads is not an integer power of 2, Netty uses modulo to calculate the number.

Take a look at the overall class diagram: Step summary: 1: first go to the constructor, determine the number of threads, the default is not specified number of CPU cores *2 3: build thread executor, initialize thread executor 4: create one EventLoop thread 5: create Selector Selector 6: create taskQueue queue

NioEventLoopGroup startup process

NioEventLoopGroup start, actually is NioEventLoopGroup threads in start, namely NioEventLoop start, NioEventLoop start there are two ways: NioEventLoop is triggered when the server is started, and when the client is connected, NioEventLoop is also triggered. The following uses the server as an example.

public ChannelFuture register(Channel channel) {
    return this.next().register(channel);
}
Copy the code

Next is a method of the Chooser that returns a NioEventLoop and then calls the registration method to continue tracking the code:

public ChannelFuture register(Channel channel) {
    return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
Copy the code
 eventLoop.execute(new Runnable() {
	public void run(a) {
	     AbstractUnsafe.this.register0(promise); }});Copy the code
public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        } else {
            // Check whether the current thread is an EventLoop thread
            boolean inEventLoop = this.inEventLoop();
            // Add tasks to the queue
            this.addTask(task);
            if(! inEventLoop) {// Start the thread
                this.startThread();
                if (this.isShutdown()) {
                    boolean reject = false;
                    try {
                        if (this.removeTask(task)) {
                            reject = true; }}catch (UnsupportedOperationException var5) {
                    }

                    if(reject) { reject(); }}}if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
                this.wakeup(inEventLoop); }}}Copy the code

When you start the server, you start the main thread, so you go to the following method

private void startThread(a) {
        if (this.state == 1 && STATE_UPDATER.compareAndSet(this.1.2)) {
            try {
                this.doStartThread();
            } catch (Throwable var2) {
                STATE_UPDATER.set(this.1); PlatformDependent.throwException(var2); }}}Copy the code
private void doStartThread(a) {
        assert this.thread == null;
        this.executor.execute(new Runnable() {
            public void run(a) {
                SingleThreadEventExecutor.this.thread = Thread.currentThread();
                if (SingleThreadEventExecutor.this.interrupted) {
                    SingleThreadEventExecutor.this.thread.interrupt();
                }

                boolean success = false;
                SingleThreadEventExecutor.this.updateLastExecutionTime();
                boolean var112 = false;

                int oldState;
                label1907: {
                    try {
                        var112 = true;
                        / / start NioEventLoop
                        SingleThreadEventExecutor.this.run();
                        success = true;
                        var112 = false;
                        break label1907;
                    } catch (Throwable var119) {
                       
                    } finally{}}}); }Copy the code

Execute requests to execute the task. AddTask adds the task to the task queue. 3 Check whether the EventLoop is called

Initialization of a channel

Take a channel on the server as an example: the channel type is NioServerSocketChannel

 bootstrap.group(bossGroup, workerGroup)
	.channel(NioServerSocketChannel.class)
	.option(ChannelOption.SO_BACKLOG, 100)
	.childHandler(new ChannelInitializer<SocketChannel>() {
	@Override
	protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(handler); }});Copy the code

Trace AbstractBootstrap next to a channel method:

 public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        } else {
            return this.channelFactory((io.netty.channel.ChannelFactory)(newReflectiveChannelFactory(channelClass))); }}Copy the code

A ReflectiveChannelFactory is created and assigned to the channelFactory, which is the factory used to produce channels.

@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
     if (channelFactory == null) {
         throw new NullPointerException("channelFactory");
     } else if (this.channelFactory ! =null) {
         throw new IllegalStateException("channelFactory set already");
     } else {
         this.channelFactory = channelFactory;
         return this.self(); }}Copy the code

Here is the source code traced to the server binding port: it mainly initializes the channel, as well as the binding of the port

 private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = this.initAndRegister();
        final Channel channel = regFuture.channel();
        if(regFuture.cause() ! =null) {
            return regFuture;
        } else if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if(cause ! =null) {
                        promise.setFailure(cause);
                    } else{ promise.registered(); AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code

Look at the initAndRegister method: this is used to initialize a channel and register a channel with a selector

final ChannelFuture initAndRegister(a) {
        Channel channel = null;

        try {
            // instantiate channel
            channel = this.channelFactory.newChannel();
            // Call the initialization method
            this.init(channel);
        } catch (Throwable var3) {
            if(channel ! =null) {
                channel.unsafe().closeForcibly();
                return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
            }

            return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }
        // Call the register method to register a channel with the selector
        ChannelFuture regFuture = this.config().group().register(channel);
        if(regFuture.cause() ! =null) {
            if (channel.isRegistered()) {
                channel.close();
            } else{ channel.unsafe().closeForcibly(); }}return regFuture;
    }
Copy the code

Entering newChannel, constructive.newinstance () is called again

public T newChannel(a) {
        try {
            return (Channel)this.constructor.newInstance();
        } catch (Throwable var2) {
            throw new ChannelException("Unable to create Channel from class " + this.constructor.getDeclaringClass(), var2); }}Copy the code

With this call, the NioServerSocketChannel’s no-argument constructor is called:

public NioServerSocketChannel(a) {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
Copy the code

NewSocket is then called:

private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException var2) {
            throw new ChannelException("Failed to open a server socket.", var2); }}Copy the code

Finally trace all the way to the parent class method:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;

        try {
             // Set to non-blocking mode
            ch.configureBlocking(false);
        } catch (IOException var7) {
            try {
                ch.close();
            } catch (IOException var6) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close a partially initialized socket.", var6); }}throw new ChannelException("Failed to enter non-blocking mode.", var7); }}Copy the code

The code above looks familiar, registering the event and setting it to non-blocking mode