The core of Netty is the Reactor thread. The NioEventLoop is widely used in the reactor project. So what is going on inside NioEventLoop? How does Netty ensure efficient polling of event loops and timely execution of tasks? How to fix JDK NIO bugs gracefully? With these questions in mind, this article will take you step by step to understand the netty Reactor thread truth [source code based on 4.1.6.final]

The REACTOR thread is started

NioEventLoop’s run method is the body of the Reactor thread and is started when the first task is added

NioEventLoop parent SingleThreadEventExecutor the execute method

@Override
public void execute(Runnable task) {...boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else{ startThread(); addTask(task); . }... }Copy the code

The external thread runs startThread() when adding a task to the reactor queue. Netty determines if the reactor thread has been started. If not, it starts the thread and adds a task to the reactor queue

private void startThread(a) {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); }}}Copy the code

SingleThreadEventExecutor when performing doStartThread invokes the executor of the execute method, internal actuators will call NioEventLoop the run method of the process of encapsulated into a runnable plug into a thread to execute

private void doStartThread(a) {... executor.execute(new Runnable() {
        @Override
        public void run(a) { thread = Thread.currentThread(); . SingleThreadEventExecutor.this.run(); . }}}Copy the code

This thread is executor created and corresponds to the Netty Reactor thread entity. Executor defaults to ThreadPerTaskExecutor

By default, a ThreadPerTaskExecutor creates a FastThreadLocalThread using the DefaultThreadFactory every time it executes the execute method. This thread is the Reactor thread entity in Netty

ThreadPerTaskExecutor

public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}
Copy the code

Why the combination of a ThreadPerTaskExecutor and a DefaultThreadFactory creates a FastThreadLocalThread will not be explained in detail here

Standard netty procedures will call to the superclass MultithreadEventExecutorGroup NioEventLoopGroup code as follows

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (executor == null) {
        executor = newThreadPerTaskExecutor(newDefaultThreadFactory()); }}Copy the code

It is then passed to NioEventLoop via newChild

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Copy the code

So much for setting up and starting the Reactor thread: The Netty Reactor thread is created when a task is added. The thread entity is FastThreadLocalThread(more on this later in this article), and the run method is executed with NioEventLoop as the body.

The reactor thread executes

So let’s focus on NioEventLoop run method

@Override
protected void run(a) {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch(Throwable t) { handleLoopException(t); }... }Copy the code

We extract the backbone, and what the Reactor thread does is very simple, as illustrated by the following diagram

What the Reactor thread does is roughly a three-step loop

1. Poll all channels registered to the Selector used by the reactor thread

select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
    selector.wakeup();
}
Copy the code

2. Process the channel that generates the NETWORK I/O event

processSelectedKeys();
Copy the code

3. Process the task queue

runAllTasks(...) ;Copy the code

Each step is described in detail below

Select the operating

select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
      selector.wakeup();
}
Copy the code

WakenUp indicates whether to wakeUp a blocking select operation. Netty sets wakeUp to false before making a new loop, indicating the start of a new loop. The specific select operation is also broken down

1. The scheduled task deadline is about to expire. Interrupt the polling

int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    if (timeoutMillis <= 0) {
        if (selectCnt == 0) {
            selector.selectNow();
            selectCnt = 1;
        }
        break; }... }Copy the code

In NioEventLoop, the select operation of the REACTOR thread is also a for loop. In the first step of the for loop, if the cut-off event of a task in the current scheduled task queue is approaching (<=0.5ms), the loop will be broken. In addition, if the select operation has not been performed before the jump (if (selectCnt == 0)), then selectNow() is called once, and the method returns immediately without blocking

It is explained here that the timed task queue in Netty is sorted according to the delay time from small to large. DelayNanos (currentTimeNanos) method is to extract the delay time of the first timed task

protected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<? > scheduledTask = peekScheduledTask();if (scheduledTask == null) {
        return SCHEDULE_PURGE_INTERVAL;
    }
    return scheduledTask.delayNanos(currentTimeNanos);
 }
Copy the code

The details of Netty’s task queues (including normal tasks, scheduled tasks, and tail tasks) are in a separate article, so I won’t go into much detail here

2. During the polling, a task is added and the polling is interrupted

for (;;) {
    // 1. The scheduled task end time is about to expire. Interrupt the polling.// 2. The polling is interrupted when a task is added
    if (hasTasks() && wakenUp.compareAndSet(false.true)) {
        selector.selectNow();
        selectCnt = 1;
        break; }... }Copy the code

In order to ensure that the task queue can be executed in time, Netty determines whether the task queue is empty when performing the blocking SELECT operation. If it is not empty, it performs a non-blocking SELECT operation to break out of the loop

3. Blocking select operation

for (;;) {
    // 1. The scheduled task end time is about to expire. Interrupt the polling.// 2. The polling is interrupted when a task is added.// 3. Blocking select operation
    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;
    if(selectedKeys ! =0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
        break; }... }Copy the code

The execution of this step indicates that the queue in the Netty task queue is empty and the delay time of all scheduled tasks has not reached (more than 0.5ms). Therefore, a blocking select operation is performed here until the deadline of the first scheduled task

Here, we can ask ourselves, if the delay of the first scheduled task is very long, say an hour, is it possible that the thread has been blocking the select operation? Of course it is! But, as long as a new task is added during this time, the block will be released

The external thread calls the execute method to add the task

@Override
public void execute(Runnable task) {... wakeup(inEventLoop);/ / inEventLoop to false. }Copy the code

Call the wakeup method to wakeup the selector

protected void wakeup(boolean inEventLoop) {
    if(! inEventLoop && wakenUp.compareAndSet(false.true)) { selector.wakeup(); }}Copy the code

As you can see, when the external thread adds the task, the wakeup method is called to wakeup the selector. Select (timeoutMillis)

After blocking the select operation, Netty performs a series of status tests to determine whether to interrupt the poll

  • Polling to IO events (selectedKeys ! = 0)
  • The oldWakenUp parameter is true
  • Task queue has tasks (hasTasks)
  • The first scheduled task is about to be executed (HasScheduledTasks ())
  • User active wake up (wakenUp.get())

Solve JDK NIO bugs

About the bug description see bugs.java.com/bugdatabase…

This bug causes selectors to remain empty polling, resulting in 100% CPU and no NIO server available. Strictly speaking, Netty has not solved the JDK bug, but has a way to circumvent this bug, which is as follows

long currentTimeNanos = System.nanoTime();
for (;;) {
    // 1. The scheduled task deadline is about to expire. Interrupt the polling.// 2. The polling is interrupted when a task is added.// 3. Blocking select operation
    selector.select(timeoutMillis);
    // 4. Solve the JDK NIO bug
    long time = System.nanoTime();
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
        selectCnt = 1;
    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

        rebuildSelector();
        selector = this.selector;
        selector.selectNow();
        selectCnt = 1;
        break; } currentTimeNanos = time; . }Copy the code

Netty is going to record the start time and currentTimeNanos before each selector. Select (timeoutMillis), and the end time after select, Whether the select operation lasted at least timeoutMillis seconds (there will be time – TimeUnit. MILLISECONDS. ToNanos (timeoutMillis) > = currentTimeNanos into a time – CurrentTimeNanos > = TimeUnit. MILLISECONDS. ToNanos (timeoutMillis) may better understand some), if the duration of the greater than or equal to timeoutMillis, that is a effective polling, Reset the selectCnt flag, otherwise, indicating that the blocking method did not block for that long, which may have triggered the JDK’s empty polling bug. When the number of empty polling times exceeds a threshold, which defaults to 512, the selector is rebuilt

The setting codes associated with the empty polling thresholds are as follows

int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold".512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
    selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
Copy the code

Let’s briefly describe how Netty fixes the empty polling bug by rebuildSelector. The rebuildSelector operation is actually very simple: New A new selector that transfers the channel registered with the old selector to the new selector. The skeleton after we have extracted the main code is as follows

public void rebuildSelector(a) {
    final Selector oldSelector = selector;
    final Selector newSelector;
    newSelector = openSelector();

    int nChannels = 0;
     try {
        for (;;) {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                     if(! key.isValid() || key.channel().keyFor(newSelector) ! =null) {
                         continue;
                     }
                     int interestOps = key.interestOps();
                     key.cancel();
                     SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                     if (a instanceof AbstractNioChannel) {
                         ((AbstractNioChannel) a).selectionKey = newKey;
                      }
                     nChannels ++;
                }
                break; }}catch (ConcurrentModificationException e) {
        // Probably due to concurrent modification of the key set.
        continue;
    }
    selector = newSelector;
    oldSelector.close();
}
Copy the code

First, create a new selector through the openSelector() method, then execute an infinite loop, and restart the transfer whenever the concurrent change selectionKeys exception occurs once

The specific transfer steps are as follows

  1. Get a valid key
  2. Unregister this key for events on the old selector
  3. Registers the channel corresponding to the key with the new selector
  4. Rebind the channel to the new key

Once the migration is complete, you can discard the original selector and do all polling on the new selector

Finally, we summarize what the REACTOR thread select step does: Continuously polling whether there are IO events, and constantly check whether there are scheduled tasks and ordinary tasks in the polling process, to ensure that netty tasks in the task queue to be effectively executed, polling process with a counter to avoid the JDK empty polling bug, the process is clear

Due to space, the following two processes will be discussed in one article, please look forward to it

process selected keys

To be continued

run tasks

To be continued

Finally, through the picture at the beginning of this article, we are once again familiar with the Netty Reactor thread

  1. Polling I/O events
  2. Process polling events
  3. Execute tasks in the task queue

If you want to learn Netty systematically, my small book “Netty Introduction and actual Practice: Imitation wechat IM instant messaging system” can help you, if you want to learn Netty principles systematically, then you must not miss my Netty source analysis series of videos: Coding.imooc.com/class/230.h…