NioEvenLoop.java

Next trace the RUN method that executes the task business.

@Override
protected void run(a) {
    // Execute an infinite loop, which will continue forever if no exceptions occur
    for (;;) {
        try {
            try {
                / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 choice in the channel -- -- -- -- -- -- -- -- -- -- -- -- -- --
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:  // NioEventLoop does not support this case
                    continue;

                case SelectStrategy.BUSY_WAIT:  // NioEventLoop does not support this case
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    // If there is no task in the current task queue, the case is executed, that is, the blocking selection is performed
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    // 'selector.select(...) '. (BAD)
                    // 2) Selector is waken up between 'selector.select(...) ' and
                    // 'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...) ' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...) ' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...) .
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        // Get the result of the selection immediately
                        selector.wakeup();
                    }
                    // fall through
                default:}}catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 2 processing in place of the channel IO -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
            cancelledKeys = 0;
            needsToSelectAgain = false;
            // the value ranges from (0,100)
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.runAllTasks(); }}else {
                // Get the current time, which is the point at which channel-ready IO starts executing
                final long ioStartTime = System.nanoTime();
                try {
                    // Handle the IO of the ready channel
                    processSelectedKeys();
                } finally {
                    / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 3 processing tasks in the queue task -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
                    // Ensure we always run tasks.
                    // Calculate the time it takes to process the CHANNEL's IO
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // ioTime * (100 - ioRatio)/ioRatio indicates the duration of a task in the task queue
                    runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}}catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return; }}}catch(Throwable t) { handleLoopException(t); }}// end-for
}


	// Perform blocking selection
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            // counter:
            // is 0, indicating that no ready channels are currently selected
            // Non-0 indicates that the selection operation has been performed
            int selectCnt = 0;
            // The current time, which is also the start time of the first round of the for loop
            long currentTimeNanos = System.nanoTime();
            // delayNanos() indicates how long it is before the first scheduled task in the scheduled task queue starts execution
            // selectDeadLineNanos specifies the time when the first scheduled task in the scheduled task queue starts to execute
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 1 handle timing tasks in the queue immediately to the first timing task execution time -- -- -- -- -- -- -- -- -- -- -
                // 500000L indicates 0.5 ms. 1000000L indicates 1 ms
                // How long is "immediately"? Less than 0.5 ms
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {  // True indicates that there is an immediate task to execute
                    // Before ending the current select() method, check to see if there are channels in place
                    if (selectCnt == 0) {
                        // Non-blocking selection
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // There is no scheduled task to execute
                // ------------------- 2 Handle the situation where new tasks are added -----------
                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                // The code can run the current select() method because there is no task to execute. But just because there wasn't,
                // Doesn't mean it hasn't been there. This is the case where you currently have a new task
                // If there is a task, the task must be executed first, so the current select() is executed. But before WE go,
                // Check to see if there are ready channels
                if (hasTasks() && wakenUp.compareAndSet(false.true)) {
                    // Non-blocking selection
                    selector.selectNow();
                    selectCnt = 1;
                    break;  // End the current select()
                }

                // The code has reached this point indicating that there are no scheduled tasks to be executed and no new tasks to be added
                // ------------------- 3 Handle blocking selection cases -----------
                // Block selection, where the block is awakened under five conditions:
                // 1) Find ready channel
                // 2) The selector wakeup() method is called
                // 3) The current thread is interrupted
                // 4) The blocking time times out
                // 5) When a large number of empty polling occurs, the CPU usage will spike sharply.
                // This method terminates prematurely
                int selectedKeys = selector.select(timeoutMillis);

                selectCnt ++;
                // If there is a ready channel, wakeup() method is called, or there is a task, end the select() function.
                if(selectedKeys ! =0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                The end of selector. Select () has ruled out cases 1) and 2)
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                // The end of selector. Select () has ruled out the first, second, and third cases,
                Selector. Select () can only end in one of 4) or 5) ways
                // ------------------- 4 Fix NIO Bug -----------
                long time = System.nanoTime();

                // If this condition is true, it means that selector. Select () ends because of case 4
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;

                    // If this condition is false, it means that selector. Select () ends because of case 5
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    / / refactoring seletor
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }
                // Update currentTimeNanos to the current time, i.e. record the point at which a for loop starts
                currentTimeNanos = time;
            }  // end-for

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector); }}}catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway}}private void processSelectedKeys(a) {
        // If set is not null,
        if(selectedKeys ! =null) {
            // Handle the optimized SelectedKeys
            processSelectedKeysOptimized();
        } else {
            // Handle the normal SelectedKeys collection
            // selectedKeys() is a setprocessSelectedKeysPlain(selector.selectedKeys()); }}private void processSelectedKeysOptimized(a) {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            // Assign null for two reasons:
            // 1) Facilitate GC when a channel is closed
            // 2) Avoid repeated processing
            selectedKeys.keys[i] = null;
            // Get the attachment in the key, the attachment can write any data,
            // However, for NioEventLoop, the attachment stores the native channel
            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                / / selectedKey processing
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1; }}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        // Handle the key failure
        if(! k.isValid()) {final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if(eventLoop ! =this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            // Get the ready event for the current selectionKey (ready channel)
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            // Handle the connection ready condition (this condition does not occur on the Server side, only the Client can initiate the connection request to the Server)
            // If the connection succeeds after the first connection request is made, the connection succeeds; Otherwise, the connection is ready
            // When the connection is ready, the channel will be selected by selector. Once the channel is selected, the IO is ready to process,
            // Connection-ready IO is to connect to the Server and complete the connection
            if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                // Get the ready event concerned by the current channel registered in the current selectionKey
                int ops = k.interestOps();
                // Place the connection ready position 0 in ops
                ops &= ~SelectionKey.OP_CONNECT;
                // Write the ops set to 0 into the selectionKey, indicating that the current connection is ready
                k.interestOps(ops);
                // Complete the connection
                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            // Handle write ready
            // What is write ready? When writeAndFlush() is called and all data to be sent is written to the user cache,
            // The write operation is ready
            // When the write operation is ready, the ready IO can be processed. Write ready IO is used to write data from the user cache to the nic cache
            if((readyOps & SelectionKey.OP_WRITE) ! =0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            // Handle read ready and receive connection ready
            // What is readready? The client has written data to the nic cache over the network.
            // What is receive connection ready? The client has written the connection request to the nic cache over the network.
            // Therefore, receive connection-ready is a special case of read readiness. Therefore, whether read or receive connection-ready,
            // Once ready, ready IO can be processed. The ready IO reads data (possibly connection requests) from the network card cache
            // to the user cache, which appears in the MSG variable of the channelRead() method

            // A readyOps value of 0 indicates that no channels are ready.
            // Two reasons:
            // 1) To avoid NIO bugs
            // 2) In order to make the IO execution time of not ready channel longer, so that the execution time of later tasks in the task queue is not too short
            if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }} handles the normal SelectedKeys collectionprivate void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }
        // Iteration collection
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if(! i.hasNext()) {break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else{ i = selectedKeys.iterator(); }}}}Copy the code

The runAllTasks() method in RUN is the one that actually executes the task

SingleThreadEventExecutor.java

/**
 * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
 * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
 */
protected boolean runAllTasks(long timeoutNanos) {
    // remove all tasks that need to be executed immediately from the scheduled taskQueue and place them in the taskQueue
    fetchFromScheduledTaskQueue();
    // Get a task from the task queue
    Runnable task = pollTask();
    // If the task is null, the end task is performed
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    // If the task is not empty
    // Calculate the end time of the tasks in the task queue
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    / / counter
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // Execute the task
        safeExecute(task);

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        // The execution time of the tasks in the task queue is not precisely controlled
        if ((runTasks & 0x3F) = =0) {   // runTasks % 64 == 0
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break; }}// end-for

    // Execute tasks in the end task queue
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}
Copy the code

AbstractEventExecutor.java

/**
 * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
 */
protected static void safeExecute(Runnable task) {
    try {
        // The actual execution of the task takes place here
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t); }}Copy the code

AfterRunningAllTasks ()

SingleThreadEventLoop.java

@Override
protected void afterRunningAllTasks(a) {
    runAllTasksFrom(tailTasks);
}
Copy the code

SingleThreadEventExecutor.java

/**
 * Runs all tasks from the passed {@code taskQueue}.
 *
 * @param taskQueue To poll and execute all tasks.
 *
 * @return {@code true} if at least one task was executed.
 */
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    // Get a task from the task queue
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        // Execute the task
        safeExecute(task);
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true; }}}protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        // Retrieves a non-wake task, i.e. an empty task, or an identifier task, from the task queue
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task == WAKEUP_TASK) {
                continue;
            }
            returntask; }}Copy the code