The core of Netty is the Reactor thread. The NioEventLoop is widely used in the reactor project. So what is going on inside NioEventLoop? How does Netty ensure efficient polling of event loops and timely execution of tasks? How to fix JDK NIO bugs gracefully? With these questions in mind, this article will take you step by step to understand the netty Reactor thread truth [source code based on 4.1.6.final]
The REACTOR thread is started
NioEventLoop’s run method is the body of the Reactor thread and is started when the first task is added
NioEventLoop parent SingleThreadEventExecutor the execute method
@Override
public void execute(Runnable task) {...boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else{ startThread(); addTask(task); . }... }Copy the code
The external thread runs startThread() when adding a task to the reactor queue. Netty determines if the reactor thread has been started. If not, it starts the thread and adds a task to the reactor queue
private void startThread(a) {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); }}}Copy the code
SingleThreadEventExecutor when performing doStartThread invokes the executor of the execute method, internal actuators will call NioEventLoop the run method of the process of encapsulated into a runnable plug into a thread to execute
private void doStartThread(a) {... executor.execute(new Runnable() {
@Override
public void run(a) { thread = Thread.currentThread(); . SingleThreadEventExecutor.this.run(); . }}}Copy the code
This thread is executor created and corresponds to the Netty Reactor thread entity. Executor defaults to ThreadPerTaskExecutor
By default, a ThreadPerTaskExecutor creates a FastThreadLocalThread using the DefaultThreadFactory every time it executes the execute method. This thread is the Reactor thread entity in Netty
ThreadPerTaskExecutor
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
Copy the code
Why the combination of a ThreadPerTaskExecutor and a DefaultThreadFactory creates a FastThreadLocalThread will not be explained in detail here
Standard netty procedures will call to the superclass MultithreadEventExecutorGroup NioEventLoopGroup code as follows
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = newThreadPerTaskExecutor(newDefaultThreadFactory()); }}Copy the code
It is then passed to NioEventLoop via newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Copy the code
So much for setting up and starting the Reactor thread: The Netty Reactor thread is created when a task is added. The thread entity is FastThreadLocalThread(more on this later in this article), and the run method is executed with NioEventLoop as the body.
The reactor thread executes
So let’s focus on NioEventLoop run method
@Override
protected void run(a) {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
processSelectedKeys();
runAllTasks(...);
}
} catch(Throwable t) { handleLoopException(t); }... }Copy the code
We extract the backbone, and what the Reactor thread does is very simple, as illustrated by the following diagram
What the Reactor thread does is roughly a three-step loop
1. Poll all channels registered to the Selector used by the reactor thread
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
Copy the code
2. Process the channel that generates the NETWORK I/O event
processSelectedKeys();
Copy the code
3. Process the task queue
runAllTasks(...) ;Copy the code
Each step is described in detail below
Select the operating
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
Copy the code
WakenUp indicates whether to wakeUp a blocking select operation. Netty sets wakeUp to false before making a new loop, indicating the start of a new loop. The specific select operation is also broken down
1. The scheduled task deadline is about to expire. Interrupt the polling
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break; }... }Copy the code
In NioEventLoop, the select operation of the REACTOR thread is also a for loop. In the first step of the for loop, if the cut-off event of a task in the current scheduled task queue is approaching (<=0.5ms), the loop will be broken. In addition, if the select operation has not been performed before the jump (if (selectCnt == 0)), then selectNow() is called once, and the method returns immediately without blocking
It is explained here that the timed task queue in Netty is sorted according to the delay time from small to large. DelayNanos (currentTimeNanos) method is to extract the delay time of the first timed task
protected long delayNanos(long currentTimeNanos) { ScheduledFutureTask<? > scheduledTask = peekScheduledTask();if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
Copy the code
The details of Netty’s task queues (including normal tasks, scheduled tasks, and tail tasks) are in a separate article, so I won’t go into much detail here
2. During the polling, a task is added and the polling is interrupted
for (;;) {
// 1. The scheduled task end time is about to expire. Interrupt the polling.// 2. The polling is interrupted when a task is added
if (hasTasks() && wakenUp.compareAndSet(false.true)) {
selector.selectNow();
selectCnt = 1;
break; }... }Copy the code
In order to ensure that the task queue can be executed in time, Netty determines whether the task queue is empty when performing the blocking SELECT operation. If it is not empty, it performs a non-blocking SELECT operation to break out of the loop
3. Blocking select operation
for (;;) {
// 1. The scheduled task end time is about to expire. Interrupt the polling.// 2. The polling is interrupted when a task is added.// 3. Blocking select operation
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if(selectedKeys ! =0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break; }... }Copy the code
The execution of this step indicates that the queue in the Netty task queue is empty and the delay time of all scheduled tasks has not reached (more than 0.5ms). Therefore, a blocking select operation is performed here until the deadline of the first scheduled task
Here, we can ask ourselves, if the delay of the first scheduled task is very long, say an hour, is it possible that the thread has been blocking the select operation? Of course it is! But, as long as a new task is added during this time, the block will be released
The external thread calls the execute method to add the task
@Override
public void execute(Runnable task) {... wakeup(inEventLoop);/ / inEventLoop to false. }Copy the code
Call the wakeup method to wakeup the selector
protected void wakeup(boolean inEventLoop) {
if(! inEventLoop && wakenUp.compareAndSet(false.true)) { selector.wakeup(); }}Copy the code
As you can see, when the external thread adds the task, the wakeup method is called to wakeup the selector. Select (timeoutMillis)
After blocking the select operation, Netty performs a series of status tests to determine whether to interrupt the poll
- Polling to IO events (
selectedKeys ! = 0
) - The oldWakenUp parameter is true
- Task queue has tasks (
hasTasks
) - The first scheduled task is about to be executed (
HasScheduledTasks ()
) - User active wake up (
wakenUp.get()
)
Solve JDK NIO bugs
About the bug description see bugs.java.com/bugdatabase…
This bug causes selectors to remain empty polling, resulting in 100% CPU and no NIO server available. Strictly speaking, Netty has not solved the JDK bug, but has a way to circumvent this bug, which is as follows
long currentTimeNanos = System.nanoTime();
for (;;) {
// 1. The scheduled task deadline is about to expire. Interrupt the polling.// 2. The polling is interrupted when a task is added.// 3. Blocking select operation
selector.select(timeoutMillis);
// 4. Solve the JDK NIO bug
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break; } currentTimeNanos = time; . }Copy the code
Netty is going to record the start time and currentTimeNanos before each selector. Select (timeoutMillis), and the end time after select, Whether the select operation lasted at least timeoutMillis seconds (there will be time – TimeUnit. MILLISECONDS. ToNanos (timeoutMillis) > = currentTimeNanos into a time – CurrentTimeNanos > = TimeUnit. MILLISECONDS. ToNanos (timeoutMillis) may better understand some), if the duration of the greater than or equal to timeoutMillis, that is a effective polling, Reset the selectCnt flag, otherwise, indicating that the blocking method did not block for that long, which may have triggered the JDK’s empty polling bug. When the number of empty polling times exceeds a threshold, which defaults to 512, the selector is rebuilt
The setting codes associated with the empty polling thresholds are as follows
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold".512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
Copy the code
Let’s briefly describe how Netty fixes the empty polling bug by rebuildSelector. The rebuildSelector operation is actually very simple: New A new selector that transfers the channel registered with the old selector to the new selector. The skeleton after we have extracted the main code is as follows
public void rebuildSelector(a) {
final Selector oldSelector = selector;
final Selector newSelector;
newSelector = openSelector();
int nChannels = 0;
try {
for (;;) {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
if(! key.isValid() || key.channel().keyFor(newSelector) ! =null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
}
break; }}catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
selector = newSelector;
oldSelector.close();
}
Copy the code
First, create a new selector through the openSelector() method, then execute an infinite loop, and restart the transfer whenever the concurrent change selectionKeys exception occurs once
The specific transfer steps are as follows
- Get a valid key
- Unregister this key for events on the old selector
- Registers the channel corresponding to the key with the new selector
- Rebind the channel to the new key
Once the migration is complete, you can discard the original selector and do all polling on the new selector
Finally, we summarize what the REACTOR thread select step does: Continuously polling whether there are IO events, and constantly check whether there are scheduled tasks and ordinary tasks in the polling process, to ensure that netty tasks in the task queue to be effectively executed, polling process with a counter to avoid the JDK empty polling bug, the process is clear
Due to space, the following two processes will be discussed in one article, please look forward to it
process selected keys
To be continued
run tasks
To be continued
Finally, through the picture at the beginning of this article, we are once again familiar with the Netty Reactor thread
- Polling I/O events
- Process polling events
- Execute tasks in the task queue
If you want to learn Netty systematically, my small book “Netty Introduction and actual Practice: Imitation wechat IM instant messaging system” can help you, if you want to learn Netty principles systematically, then you must not miss my Netty source analysis series of videos: Coding.imooc.com/class/230.h…