Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.

preface

The first three articles have analyzed the source code of the Netty server channel initialization, registration and binding process. In theory, this article should start to analyze the source code of the new connection access process, but in the process of looking at the source code, there is a very important component: NioEventLoop appears so frequently that it affects the reading of the source code, so I decided to analyze the source code of NioEventLoop first, and then analyze the source code of the new connection. The source code analysis of the NioEventLoop component will be shared in two articles. The first article will focus on the creation and startup of NioEventLoop, and the second will focus on the execution process of NioEventLoop.

Before we start, let’s think about two questions.

  1. When is a thread started in Netty?
  2. How do threads in Netty achieve serial locking?

Functional specifications

NioEventLoop can be understood as a thread in terms of function. When it is started, it will continuously Loop through three tasks (the name of the class also reflects the idea of Loop processing: Loop). What are the three tasks?

  1. Network I/O events;
  2. Normal task. Execute (Runnable Task) is called to perform ordinary tasks.
  3. Scheduled tasks. Schedule (Runnable Task, Long delay,TimeUnit Unit) is called to perform scheduled tasks.

The NioEventLoop class has a particularly complex inheritance relationship, as shown in the UML diagram below.

As can be seen from the figure, it implements the ScheduledExecutorService interface, so it can realize the functions related to scheduled tasks. At the same time it also inherits the SingleThreadEventExecutor classes, from the name of the class, this is a single thread thread actuators.

Create a process

In Netty, we create NioEventLoop through the NioEventLoopGroup, with the entry line below.

EventLoopGroup workerGroup = new NioEventLoopGroup()
Copy the code

When using the NioEventLoopGroup’s no-parameter constructor, Netty creates nioEventloops with twice the number of CPU cores by default. When using the parameter constructor of a NioEventLoopGroup, passing an int to the constructor creates a specified number of NioEventLoops. Whether you use the NioEventLoopGroup parameter constructor or no parameter constructor, the following constructor in the NioEventLoopGroup class is eventually called.

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    // Call the parent class
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Copy the code

This constructor takes many arguments, each of which is explained below.

  • NThreads: The number of threads to be created. If the NioEventLoopGroup parameter constructor was used previously, the value of nThreads is 0. If the NioEventLoopGroup parameter constructor was used, the value of nThreads is the value passed in the constructor.
  • Executor: Thread executor. Null by default. The value of this property will be initialized later when a NioEventLoop is created. You can customize executor. If you customize executor, the executor will not be null and will not be initialized later.
  • SelectorProvider: SelectorProvider type, it is through SelectorProvider provider () is created, it is in the JDK NIO API, will create a SelectorProvider object, The purpose of this object is to create a multiplexer Selector and a server channel.
  • SelectStrategyFactory: Factory selection strategy, through DefaultSelectStrategyFactory INSTANCE creation, the INSTANCE of the constant value is through new DefaultSelectStrategyFactory () to create.
  • RejectedExecutionHandlers. Reject () : returns a refuse strategy, when the task is added to the thread pool, task queue if the thread pool is full, the task will be refused, at this time the thread pool rejection policies should be executed.

Then will call the superclass constructor, NioEventLoopGroup directly inherited MultithreadEventLoopGroup class, at this time will be called to MultithreadEventLoopGroup constructor as follows.

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code

As you can see, one of the constructor arguments is args, which is a mutable array of type Object. So when the constructor of the NioEventLoopGroup is called into the parent class, SelectorProvider, selectStrategyFactory, RejectedExecutionHandlers. Reject () into the args the variable element of the array. In addition, if the nThread passed in earlier is 0, then make the value of nThread equal to DEFAULT_EVENT_LOOP_THREADS, The value of DEFAULT_EVENT_LOOP_THREADS is 2 times the number of CPU cores; If the previously passed nThread is not 0, the passed nThread is used.

Then continue up calls the superclass constructor, MultithreadEventLoopGroup inherited MultithreadEventExecutorGroup class, therefore calls to MultithreadEventExecutorGroup constructor as follows.

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code

The nThreads, Executor, and ARgs parameters are passed to the server. Then through DefaultEventExecutorChooserFactory. The INSTANCE is created, an event to do factory INSTANCE of constant value is through new DefaultEventExecutorChooserFactory () object is created. Then through this call MultithreadEventExecutorGroup class of another constructor, then the construction method is the core code. The constructor code is very long, and I have simplified it for easy reading. The source code is as follows.

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        /** * Create thread executor: ThreadPerTaskExecutor * newDefaultThreadFactory() creates a thread factory, which is used to create threads, and gives them the name nioEventloop-1-xx */
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // Create an array size based on the number of threads passed in to hold the NioEventLoop object instance
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        // There is an exception flag
        boolean success = false;
        try {
            // Create a nThreads nioEventLoop and store it in the Children array
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // Exception handling...}}// Create a thread executor through the thread executor selection factory
    chooser = chooserFactory.newChooser(children);

    // omit some code...
}
Copy the code

There are three main pieces of logic in this approach. The first piece of logic: When executor is empty, create a thread executor of type ThreadPerTaskExecutor. Second piece of logic: Create NIoEventLoop via newChild(Executor, args); Third place: through chooserFactory. NewChooser (children) to create a thread actuator selector. These three pieces of logic will be examined in detail step by step.

Create thread executor

if (executor == null) {
    /** * Create thread executor: ThreadPerTaskExecutor * newDefaultThreadFactory() creates a thread factory, which is used to create threads, and gives them the name nioEventloop-1-xx */
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
Copy the code

In the first core logic, the executor is first determined to be null. If the user does not specify it himself, executor is null by default, so a thread executor of type ThreadPerTaskExecutor is created using the new keyword.

Before calling the constructor of ThreadPerTaskExecutor, a thread factory of type DefaultThreadFactory is created using new DefaultThreadFactory(). It implements the ThreadFactory interface. When ThreadFactory’s newThread() method is called, a thread is created and given a meaningful name as nioEventloop-xx-xx. The first xx indicates the group number of NiEventLoopGroup. In Netty, bossGroup and workerGroup can be created at the same time. Therefore, the first XX indicates the number of the thread group. The second xx represents the serial number of the thread in the thread group. For example, nioEventloop-1-1 indicates that the thread is the first thread in the first NioEventLoopGroup thread group.

The source code of ThreadPerTaskExecutor is relatively simple. It implements the Executor interface and overwrites the execute() method. Each time the Execute () method of ThreadPerTaskExecutor is called, a thread is created and started. One question might be asked here: does creating a thread every time you call the execute() method mean creating a lot of threads? In fact, ThreadPerTaskExecutor’s execute() method is called only once per NioEventLoop, so only one thread is created for each NioEventLoop, and when the thread is started, ThreadPerTaskExecutor’s execute() method will no longer be called, and multiple threads will not be created on the system.

public final class ThreadPerTaskExecutor implements Executor {

    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        // threadFactory is the DefaultThreadFactory created earlier
        // Create a thread through the thread factory's newThread() method and start the threadthreadFactory.newThread(command).start(); }}Copy the code

Create NioEventLoop

// Create an array size based on the number of threads passed in to hold the NioEventLoop object instance
children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
    // There is an exception flag
    boolean success = false;
    try {
        // Create a nThreads nioEventLoop and store it in the Children array
        children[i] = newChild(executor, args);
        success = true;
    } catch (Exception e) {
        throw new IllegalStateException("failed to create a child event loop", e);
    } finally {
        // Exception handling...}}Copy the code

Before performing the second core logic, we create an array of type EventExecutor, the size of which is the number of threads passed in, and assign the array to the Children property, which is the NioEventLoopGroup property, The NioEventLoopGroup contains a set of NioEventLoop threads that the children attribute is used to store. The array is created, but the elements are null, so we fill the array with elements through the for loop, create a NioEventLoop object through newChild(Executor, args), and assign the object to the elements in the array.

When newChild(Executor, args) is called, the first executor argument is the ThreadPerTaskExecutor object created in the previous step, and the second argument is a mutable array of what each element is and what it does. This has been explained above. NewChild (Executor, args) is defined in the NioEventLoopGroup class and the source code is shown below.

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    /** * executor: ThreadPerTaskExecutor * args: Args is an argument to a mutable array. It actually contains three elements, the three arguments passed in earlier, as follows: * SelectorProvider provider () is in the JDK NIO API, will create a SelectorProvider, Behind its role is to create a multiplexer Selector and the service side channel * DefaultSelectStrategyFactory INSTANCE is a factory default selection strategy, New DefaultSelectStrategyFactory () * RejectedExecutionHandlers. Reject () returns a refuse strategy, when the task is added to the thread pool, task queue if the thread pool is full, the task will be rejected, Then execute the reject policy * */
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Copy the code

As you can see, the constructor for NioEventLoop is called directly in newChild(). The NioEventLoop constructor is listed below.

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    /** * executor: ThreadPerTaskExecutor * args: Args is an argument to a mutable array. It actually contains three elements, the three arguments passed in earlier, as follows: * SelectorProvider provider () is in the JDK NIO API, will create a SelectorProvider, Behind its role is to create a multiplexer Selector and the service side channel * DefaultSelectStrategyFactory INSTANCE is a factory default selection strategy, New DefaultSelectStrategyFactory () * RejectedExecutionHandlers. Reject () returns a refuse strategy, when the task is added to the thread pool, task queue if the thread pool is full, the task will be rejected, Then execute the reject policy * */
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    The openSelector() method creates a multiplexer, but the underlying data interface of the multiplexer's selectedKey has been replaced
    final SelectorTuple selectorTuple = openSelector();
    // Replaces the native selector of the data structure selectedKeys publicSelectedKeys
    selector = selectorTuple.selector;
    // The subclass-wrapped selector underlying data structure is also replaced
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
Copy the code

There are two main things you do in the constructor of the NioEventLoop: you continue to call the constructor of the parent class up, and you call the openSelector() method. In the constructor of the parent class, two task queues are initialized: TailTasks and taskQueue, which both create queues of type MpscQueue and wrap the incoming executor as an anonymous class with ThreadExecutorMap. (What is an MpscQueue? This is short for “many producer single Consumer”, which means that multiple producers can store items in the queue at a time, but only one thread is allowed to take items from the queue at a time.)

The next step is to create a multiplexer Selector by calling openSelector(), and then save the multiplexer in a NioEventLoop, where Netty optimizes the multiplexer. The underlying data structure that stores the SelectionKey in the native Selector is the HashSet. In the extreme case of a HashSet, the time complexity of adding operations is O(n). Netty replaces the HashSet type with the array type. The time complexity of this addition operation is always O(1). The source code for the openSelector() method is quite long, and I post it as an image below, or you can skip the source code and see my summary later.

The source code for the openSelector() method is long, and can be summarized in the following steps:

  • Call the JDK API to create the multiplexer Selector: provider.openSelector();
  • Use the DISABLE_KEY_SET_OPTIMIZATION attribute to check whether optimization is disabled. If the value is true, no underlying data structure replacement is performed, that is, no optimization is performed and the native Selector is returned. The DISABLE_KEY_SET_OPTIMIZATION constant indicates whether optimization is disabled, that is, whether to disable the replacement of underlying data structures. The default value is false and optimization is not disabled. By io.net ty. NoKeySetOptimization to configuration.
  • By reflecting load SelectorImpl: Class. Class.forname (” sun. Nio. Ch. SelectorImpl “, false, PlatformDependent. GetSystemClassLoader ());
  • Get the selectedKeys and publicSelectedKeys properties in the native SelectorImpl by reflection (they are of type HashSet), set the access permissions to true, and then reflection, SelectedKeys/publicSelectedKeys/publicSelectedKeys/publicSelectedKeys/publicSelectedKeys/publicSelectedKeys/publicSelectedKeys The underlying data structure of this type is an array type;
  • Finally SelectedSelectionKeySet encapsulation to netty custom multiplexer SelectedSelectionKeySetSelector, We then encapsulate the JDK’s native Selector and Netty’s custom Selector into a SelectorTuple, and return the SelectorTuple. Note: The underlying data structure of the native selector has been replaced with an array on return.

Now that the creation of the NioEventLoop is complete, summarize what you did to create the NioEventLoop.

  • Two queues are initialized: taskQueue and tailQueue, both of type MpscQueue. The taskQueue queue is used to store tasks. NioEventLoop is started and the task is removed from the queue for execution. TailQueue is a queue that holds some finishing work.
  • Wrap ThreadPerTaskExecutor passed in as an anonymous class with ThreadExecutorMap and save it to the Executor property of NioEventLoop. The NioEventLoop is then used to retrieve the thread executor and execute the task.
  • Will reject the strategy: RejectedExecutionHandlers. Reject () and a selection strategy factory DefaultSelectStrategyFactory. Save the INSTANCE to NioEventLoop, Get it from NioEventLoop later.
  • Save the JDK’s native multiplexer Selector to the NioEventLoop’s unwrappedSelector property, The Netty custom multiplexer SelectedSelectionKeySetSelector saved to NioEventLoop selector that attribute. The underlying data types of unwrappedSelector and selector are both array types.

The thread executor selects the factory

Once the NioEventLoop is all created, the third core logic is executed, which creates a selector for the thread executor through a selection factory, assigning values to the Chooser property. I might be a little confused here, but what does that mean? Why did you create this selector?

Netty’s NioEventLoopGroup contains a group of threads, namely a group of NioEventLoop. When a new connection is connected to the server, the new connection needs to read and write IO events. In this case, a NioEventLoop will be used to bind to the new connection, that is, to the client channel. Subsequent data reads and writes to the client channel will be based on the bound NioEventLoop. Since there are multiple NioEventLoop threads, which NioEventLoop should be selected from the thread group to bind to the client channel?

Netty’s approach is: polling, when the first client channel arrives, fetch the first thread in the thread group, namely the first element in the children array; And then when the second thread comes in, it takes the second element in the array, and so on, loops through the NioEventLoop from the Children array. This algorithm is very simple. How do you implement it? For each client channel, fetch the counter, modulo the array with the counter, and increment the counter by one.

Because modulus is a time-consuming process compared to bit operation, Netty has optimized it. When the number of threads is 2 to the integer power, Netty uses the way of bit operation to carry out modulus operation; When the number of threads is not 2 to the integer power, Netty or the use of modular method to calculate. These two methods are implemented by two classes: PowerOfTwoEventExecutorChooser and GenericEventExecutorChooser, these two classes are EventExecutorChooser types, events actuators is the translation of the selector.

The chooser is both selector instance, whether PowerOfTwoEventExecutorChooser type of instance or GenericEventExecutorChooser type of instance, this depends on the number of nThread. NewChooser (EventExecutor[] Executors) method source code below.

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // Executors is an array of objects for New NioEventLoop(),
    // Executors. Length = the value of the preceding nThread parameter
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return newGenericEventExecutorChooser(executors); }}Copy the code

The isPowerOfTwo(int val) method checks whether the value passed in is an integer power of 2. How do you tell? Again, by bit operations. For example, if 8 is passed in, then 8 and -8 are written in binary:

8:00000000000000000000000000001000-8:11111111111111111111111111111000Copy the code

If you take 8 and -8, you get 8 again, which is the same number, so 8 is 2 to the integer power.

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}
Copy the code

At this point, the creation of the NioEventLoopGroup is complete, and so is the creation of the NioEventLoop. NioEventLoop is a thread that must be started before it can execute tasks in a polling manner. NioEventLoop is a thread that must be started before it can execute tasks in polling. So when did NioEventLoop start up?

Start the

NioEventLoop can be triggered either during server startup or when a new connection is connected. The following uses the startup process of the server as an example. During server startup, the following line of code is executed.

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
Copy the code

Next () is a method of Chooser, and chooser has two different implementations: PowerOfTwoEventExecutorChooser and GenericEventExecutorChooser, these two kinds of different implementation to the next () method has different implementation logic, the difference is: NioEventLoop is retrieved from the Children array using bits, or NioEventLoop is retrieved from the Children array by modulo, but ultimately a NioEventLoop is returned.

So what we’re actually doing here is executing NioEventLoop’s register(Channel) method, which goes all the way down to the following code:

eventLoop.execute(new Runnable() {
    @Override
    public void run(a) { register0(promise); }});Copy the code

Here is called NioEventLoop the execute () method, NioEventLoop inherited SingleThreadEventExecutor, Execute (task) defined in SingleThreadEventExecutor class, its source code is as follows.

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    // Determine whether the current thread is equal to the thread in NioEventLoop. Return true for equality
    boolean inEventLoop = inEventLoop();
    // Add the task to the thread queue
    addTask(task);
    if(! inEventLoop) {// Start the thread
        startThread();
        // omit some code...
    }
    // Omit some code
}
Copy the code

InEventLoop () returns false because the thread is main, so it goes into the if block and calls startThread() to start the thread.

private void startThread(a) {
    // If the thread is in the started state, it will attempt to start the thread
    if (state == ST_NOT_STARTED) {
        // Try setting ST_NOT_STARTED to ST_STARTED
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                doStartThread();
                success = true;
            } finally {
                If doStartThread() fails, roll back STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)
                if(! success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}
Copy the code

In doStart(), it determines whether NioEventLoop is not started, and only if it is, will it attempt to start the thread. Before starting the thread, the CAS method is used to mark the state as started, and after the CAS is successful, the doStartThread() method is called. The simplified source code for the doStartThread() method is shown below.

private void doStartThread(a) {
    assert thread == null;
    // Actually start the thread
    executor.execute(new Runnable() {
        @Override
        public void run(a) {
            // Save the thread
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                / / start NioEventLoop
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // omit some code....}}}); }Copy the code

The execute() method of the executor property is called in doStartThread(). What is the value of the executor property? When you create a NioEventLoop, you create an object of type ThreadPerTaskExecutor, wrap it with ThreadExecutorMap as an anonymous class, and assign the anonymous class to the Executor property. So the execute(Runnable Task) method of the anonymous class is called, and the anonymous class ultimately calls ThreadPerTaskExecutor’s Execute (Runnable Task) method. ThreadPerTaskExecutor’s Execute (Runnable Task) method has been briefly analyzed in the previous section, so I’ll post it again for ease of reading.

public void execute(Runnable command) {
    // threadFactory is the DefaultThreadFactory created earlier
    // Create a thread through the thread factory's newThread() method and start the thread
    threadFactory.newThread(command).start();
}
Copy the code

As you can see, the execute() method calls the thread factory’s newThread(command) method to create a thread, and then calls the thread’s start() method to start it. When the thread starts, the run() method of the Runnable task passed in is called back, so the run() method of Runnable passed in the doStartThread() method is then called back. From doStartThread () in source code as you can see, in the run () method of threads will be created to save up first, and then invokes the SingleThreadEventExecutor. Enclosing the run (). This line of code is to start the NioEventLoop thread. The source code of this method is very long, the core of the entire NioEventLoop is in this method, it is actually an infinite for loop, constantly to process events and tasks. The source code for this method will be examined in detail in the next article.

At this point, the NioEventLoop Thread is started, and the NioEventLoop will continue to execute in the infinite for loop, so the NioEventLoop is started.

conclusion

  • In this paper, new NioEventLoopGroup() as the entry point, through the analysis of the source code of NioEventLoopGroup, thereby analyzing the creation process of NioEventLoop, but also introduced Netty’s optimization of NIO. Then it analyzes how NioEventLoop is started by starting the server channel.
  • By default, Netty creates NioEventLoop threads with twice the number of CPU cores, or a specified number of NioEventloops if specified.
  • Finally, answer the first two questions.
  • First question: When is a thread started in Netty? There are two startup times, one is triggered during server startup, and the other is triggered when a new connection is connected, but ultimately the ThreadPerTaskExecutor class calls the Execute (Runnable Command) method to create a thread from the thread factory. The thread is then started by calling the start() method of the thread, and when started, the run() method of the Runnable task is called back. In the task of the run () method by calling SingleThreadEventExecutor. Enclosing the run () to invoke the NioEventLoop run () method, thus initiated the NioEventLoop.
  • Second question: How do threads in Netty achieve serial locking? Each NioEventLoop contains only one thread, and each channel is bound to only one NioEventLoop. The NioEventLoop thread will handle all the IO operations of the later channel, so there will not be multiple NioEventLoop threads competing to handle the channel. Therefore, on NioEventLoop, All operations are handled serially and there is no lock contention, i.e. serial lock-free. Some people may ask, does not serial processing of tasks reduce the throughput of the system? Obviously not, because there are multiple NioEventLoop threads in Netty, and multiple NioEventLoop threads are processed in serial at the same time. In this way, the service can run in parallel with multiple threads, and there is no lock competition between each thread, which greatly improves the service performance.

recommended

  • How to evolve from BIO to NIO to Netty
  • Netty source Code Analysis series Reactor thread model
  • Netty source code analysis series server Channel initialization
  • Netty source code analysis series server Channel registration
  • Netty source code analysis series server Channel port binding
  • Netty Source Code Analysis series NioEventLoop creation and launch