NioEvenLoop.java
Next trace the RUN method that executes the task business.
@Override
protected void run(a) {
// Execute an infinite loop, which will continue forever if no exceptions occur
for (;;) {
try {
try {
/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 choice in the channel -- -- -- -- -- -- -- -- -- -- -- -- -- --
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // NioEventLoop does not support this case
continue;
case SelectStrategy.BUSY_WAIT: // NioEventLoop does not support this case
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
// If there is no task in the current task queue, the case is executed, that is, the blocking selection is performed
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...) '. (BAD)
// 2) Selector is waken up between 'selector.select(...) ' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...) ' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...) ' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...) .
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
// Get the result of the selection immediately
selector.wakeup();
}
// fall through
default:}}catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 2 processing in place of the channel IO -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
cancelledKeys = 0;
needsToSelectAgain = false;
// the value ranges from (0,100)
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.runAllTasks(); }}else {
// Get the current time, which is the point at which channel-ready IO starts executing
final long ioStartTime = System.nanoTime();
try {
// Handle the IO of the ready channel
processSelectedKeys();
} finally {
/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 3 processing tasks in the queue task -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
// Ensure we always run tasks.
// Calculate the time it takes to process the CHANNEL's IO
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * (100 - ioRatio)/ioRatio indicates the duration of a task in the task queue
runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}}catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return; }}}catch(Throwable t) { handleLoopException(t); }}// end-for
}
// Perform blocking selection
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// counter:
// is 0, indicating that no ready channels are currently selected
// Non-0 indicates that the selection operation has been performed
int selectCnt = 0;
// The current time, which is also the start time of the first round of the for loop
long currentTimeNanos = System.nanoTime();
// delayNanos() indicates how long it is before the first scheduled task in the scheduled task queue starts execution
// selectDeadLineNanos specifies the time when the first scheduled task in the scheduled task queue starts to execute
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
/ / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 handle timing tasks in the queue immediately to the first timing task execution time -- -- -- -- -- -- -- -- -- -- -
// 500000L indicates 0.5 ms. 1000000L indicates 1 ms
// How long is "immediately"? Less than 0.5 ms
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { // True indicates that there is an immediate task to execute
// Before ending the current select() method, check to see if there are channels in place
if (selectCnt == 0) {
// Non-blocking selection
selector.selectNow();
selectCnt = 1;
}
break;
}
// There is no scheduled task to execute
// ------------------- 2 Handle the situation where new tasks are added -----------
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// The code can run the current select() method because there is no task to execute. But just because there wasn't,
// Doesn't mean it hasn't been there. This is the case where you currently have a new task
// If there is a task, the task must be executed first, so the current select() is executed. But before WE go,
// Check to see if there are ready channels
if (hasTasks() && wakenUp.compareAndSet(false.true)) {
// Non-blocking selection
selector.selectNow();
selectCnt = 1;
break; // End the current select()
}
// The code has reached this point indicating that there are no scheduled tasks to be executed and no new tasks to be added
// ------------------- 3 Handle blocking selection cases -----------
// Block selection, where the block is awakened under five conditions:
// 1) Find ready channel
// 2) The selector wakeup() method is called
// 3) The current thread is interrupted
// 4) The blocking time times out
// 5) When a large number of empty polling occurs, the CPU usage will spike sharply.
// This method terminates prematurely
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// If there is a ready channel, wakeup() method is called, or there is a task, end the select() function.
if(selectedKeys ! =0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
The end of selector. Select () has ruled out cases 1) and 2)
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// The end of selector. Select () has ruled out the first, second, and third cases,
Selector. Select () can only end in one of 4) or 5) ways
// ------------------- 4 Fix NIO Bug -----------
long time = System.nanoTime();
// If this condition is true, it means that selector. Select () ends because of case 4
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// If this condition is false, it means that selector. Select () ends because of case 5
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
/ / refactoring seletor
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
// Update currentTimeNanos to the current time, i.e. record the point at which a for loop starts
currentTimeNanos = time;
} // end-for
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector); }}}catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway}}private void processSelectedKeys(a) {
// If set is not null,
if(selectedKeys ! =null) {
// Handle the optimized SelectedKeys
processSelectedKeysOptimized();
} else {
// Handle the normal SelectedKeys collection
// selectedKeys() is a setprocessSelectedKeysPlain(selector.selectedKeys()); }}private void processSelectedKeysOptimized(a) {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// Assign null for two reasons:
// 1) Facilitate GC when a channel is closed
// 2) Avoid repeated processing
selectedKeys.keys[i] = null;
// Get the attachment in the key, the attachment can write any data,
// However, for NioEventLoop, the attachment stores the native channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
/ / selectedKey processing
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1; }}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// Handle the key failure
if(! k.isValid()) {final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if(eventLoop ! =this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
// Get the ready event for the current selectionKey (ready channel)
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// Handle the connection ready condition (this condition does not occur on the Server side, only the Client can initiate the connection request to the Server)
// If the connection succeeds after the first connection request is made, the connection succeeds; Otherwise, the connection is ready
// When the connection is ready, the channel will be selected by selector. Once the channel is selected, the IO is ready to process,
// Connection-ready IO is to connect to the Server and complete the connection
if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
// Get the ready event concerned by the current channel registered in the current selectionKey
int ops = k.interestOps();
// Place the connection ready position 0 in ops
ops &= ~SelectionKey.OP_CONNECT;
// Write the ops set to 0 into the selectionKey, indicating that the current connection is ready
k.interestOps(ops);
// Complete the connection
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// Handle write ready
// What is write ready? When writeAndFlush() is called and all data to be sent is written to the user cache,
// The write operation is ready
// When the write operation is ready, the ready IO can be processed. Write ready IO is used to write data from the user cache to the nic cache
if((readyOps & SelectionKey.OP_WRITE) ! =0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// Handle read ready and receive connection ready
// What is readready? The client has written data to the nic cache over the network.
// What is receive connection ready? The client has written the connection request to the nic cache over the network.
// Therefore, receive connection-ready is a special case of read readiness. Therefore, whether read or receive connection-ready,
// Once ready, ready IO can be processed. The ready IO reads data (possibly connection requests) from the network card cache
// to the user cache, which appears in the MSG variable of the channelRead() method
// A readyOps value of 0 indicates that no channels are ready.
// Two reasons:
// 1) To avoid NIO bugs
// 2) In order to make the IO execution time of not ready channel longer, so that the execution time of later tasks in the task queue is not too short
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }} handles the normal SelectedKeys collectionprivate void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
// Iteration collection
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if(! i.hasNext()) {break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else{ i = selectedKeys.iterator(); }}}}Copy the code
The runAllTasks() method in RUN is the one that actually executes the task
SingleThreadEventExecutor.java
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
*/
protected boolean runAllTasks(long timeoutNanos) {
// remove all tasks that need to be executed immediately from the scheduled taskQueue and place them in the taskQueue
fetchFromScheduledTaskQueue();
// Get a task from the task queue
Runnable task = pollTask();
// If the task is null, the end task is performed
if (task == null) {
afterRunningAllTasks();
return false;
}
// If the task is not empty
// Calculate the end time of the tasks in the task queue
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
/ / counter
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// Execute the task
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// The execution time of the tasks in the task queue is not precisely controlled
if ((runTasks & 0x3F) = =0) { // runTasks % 64 == 0
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break; }}// end-for
// Execute tasks in the end task queue
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
Copy the code
AbstractEventExecutor.java
/**
* Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
*/
protected static void safeExecute(Runnable task) {
try {
// The actual execution of the task takes place here
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t); }}Copy the code
AfterRunningAllTasks ()
SingleThreadEventLoop.java
@Override
protected void afterRunningAllTasks(a) {
runAllTasksFrom(tailTasks);
}
Copy the code
SingleThreadEventExecutor.java
/**
* Runs all tasks from the passed {@code taskQueue}.
*
* @param taskQueue To poll and execute all tasks.
*
* @return {@code true} if at least one task was executed.
*/
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// Get a task from the task queue
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
// Execute the task
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true; }}}protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
// Retrieves a non-wake task, i.e. an empty task, or an identifier task, from the task queue
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
returntask; }}Copy the code