preface

Netty serial design avoids thread competition. The core of the design is EventLoop, which associates each object that needs to be synchronized with a corresponding EventLoop thread to handle the tasks generated by the object. This chapter takes a closer look at EventLoop.

NioEventLoop:

  • How Netty implements its Future and Promise (AbstractEventExecutor) through the JDK’s thread pool abstraction.
  • There are three task queues: scheduled task queue, common task queue and tail task queue.
  • How to bind with EventLoop thread (SingleThreadEventExecutor).

On the other hand, learn the duties and execution of NioEventLoop:

  • Performance optimization for JDKSelector.
  • Fix JDK empty polling BUG.
  • Event loop execution process SELECT ->processSelectionKey->runAllTasks.

EventLoop inheritance structure

1, abstract

First, the top layer of the abstract interface is the EventExecutorGroup.

  • Responsible for providing EventExecutor(s), and selecting the next EventExecutor(which is a subinterface of the EventExecutorGroup) via the Next method.
  • Manages the life cycle of EventExecutor(s), providing the shutdown of all EventExecutor services.
public interface EventExecutorGroup extends ScheduledExecutorService.可迭代<EventExecutor> {
    // Returns true when all EventExecutors are closed or have been closed
    boolean isShuttingDown(a);
		// Gracefully close all EventExecutorFuture<? > shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    // Return an EventExecutor managed by the current Group
    EventExecutor next(a);
}
Copy the code

EventExecutor continues to extend the EventExecutorGroup interface

  • For special EventExecutorGroups, the next method returns only itself.
  • Determines whether the given thread/current thread is in the same thread as the EventLoop.
  • Construct the Promise and Future instances bound to the current EventExecutor.
public interface EventExecutor extends EventExecutorGroup {
    // return to itself
    @Override
    EventExecutor next(a);
		// Determine whether the given thread/current thread is in the same thread as EventLoop
    boolean inEventLoop(a);
    boolean inEventLoop(Thread thread);
		// Construct the Promise and Future instances bound to the current EventExecutor
    <V> Promise<V> newPromise(a);
    <V> ProgressivePromise<V> newProgressivePromise(a);
    <V> Future<V> newSucceededFuture(V result);
    <V> Future<V> newFailedFuture(Throwable cause);
}
Copy the code

The EventLoopGroup is parallel to the EventExecutor, extending the EventExecutorGroup interface, and is a special EventExecutorGroup that supports Channel registration.

public interface EventLoopGroup extends EventExecutorGroup {
    ChannelFuture register(Channel channel);
    ChannelFuture register(ChannelPromise promise);
}
Copy the code

Combining EventExecutor and EventLoopGroup, it is not difficult to see the binding relationship between Channel<->EventLoop<->Thread.

OrderedEventExecutor marks the interface and marks EventExecutor to process all tasks in order.

/**
 * Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
 */
public interface OrderedEventExecutor extends EventExecutor {}Copy the code

An EventLoop can also be thought of as a tag interface that inherits the EventLoopGroup interface and registers a Channel and handles all IO operations on the Channel. An EventLoop instance tends to manage multiple channels.

/**
 * Will handle all the I/O operations for a {@link Channel} once registered.
 * One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on implementation details and internals.
 */
public interface EventLoop extends OrderedEventExecutor.EventLoopGroup {
    @Override
    EventLoopGroup parent(a);
}
Copy the code

2, implementation,

AbstractEventExecutor

AbstractEventExecutor is a top-level abstract class (minus the JDK) that implements some general methods, some of which are posted here.

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private final EventExecutorGroup parent;
    protected AbstractEventExecutor(EventExecutorGroup parent) {
        this.parent = parent;
    }
    @Override
    public EventExecutorGroup parent(a) {
        return parent;
    }
    @Override
    public EventExecutor next(a) {
        return this;
    }
    @Override
    public boolean inEventLoop(a) {
        return inEventLoop(Thread.currentThread());
    }
    @Override
    publicFuture<? > shutdownGracefully() {return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
    @Override
    public <V> Promise<V> newPromise(a) {
        return new DefaultPromise<V>(this); }}Copy the code

AbstractEventExecutor also inherits the JDK’s Abstractions AbstractExecutorService thread pool class to use the JDK’s abstraction of the thread pool service, as well as its own Future&Promise architecture. The submit method ADAPTS the JDK’s Submit method and returns the NettyFuture inherited from JDKFuture. The newTaskFor method, which in the JDK returns FutureTask, is Netty’s own PromiseTask.

// The subclass AbstractExecutorService interface defines the Future of the JDK;
// The interface definition returns the Future of Netty (because it is a subclass of JDKFuture).
@Override
public <T> Future<T> submit(Runnable task, T result) {
    return (Future<T>) super.submit(task, result);
}
// rewrite the JDK AbstractExecutorService#newTaskFor method to access Netty's own PromiseTask
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  return new PromiseTask<T>(this, runnable, value);
}
Copy the code

PromiseTask

PromiseTask, which inherits Netty’s DefaultPromise and implements THE JDK’s RunnableFuture, is an adaptation class.

class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
    // A Callable or a Runnable
    private Object task;
}
Copy the code

On the one hand, PromiseTask disables all methods for setting Promise outcomes.

@Override
public final Promise<V> setFailure(Throwable cause) {
    throw new IllegalStateException();
}
@Override
public final Promise<V> setSuccess(V result) {
  throw new IllegalStateException();
}
@Override
public final boolean tryFailure(Throwable cause) {
  return false;
}
// ... 
Copy the code

On the other hand, implementing the JDK’s RunnableFuture ensures that Promise results can only be set when the task is actually executed, in order to commit to the JDK’s thread pool abstraction service.

@Override
public void run(a) {
    try {
        // super.setUncancellable Promise setting cannot be cancelled
        if (setUncancellableInternal()) {
            // Execute the task
            V result = runTask();
            // super.setSuccess Promise setting executed successfullysetSuccessInternal(result); }}catch (Throwable e) {
        // super.setFailure Promise setting failssetFailureInternal(e); }}final V runTask(a) throws Exception {
  final Object task = this.task;
  if (task instanceof Callable) {
    return ((Callable<V>) task).call();
  }
  ((Runnable) task).run();
  return null;
}
Copy the code

AbstractScheduledEventExecutor

AbstractScheduledEventExecutor on the basis of AbstractEventExecutor has realized the timing delay/task execution.

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    // Priority queue, which stores delayed tasks, arranged in ascending order according to the next execution time of the tasksPriorityQueue<ScheduledFutureTask<? >> scheduledTaskQueue;// Next task id
    long nextTaskId;
}
Copy the code

ScheduledTaskQueue if it is the current EventLoop thread. Otherwise, the execute method implemented by subclasses is called.

@Override
publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit) {
  return schedule(new ScheduledFutureTask<Void>(
    this,
    command,
    deadlineNanos(unit.toNanos(delay))));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduleFromEventLoop(task);
    } else {
        // ...
        execute(task);
    }
    return task;
}
final void scheduleFromEventLoop(finalScheduledFutureTask<? > task) {
  scheduledTaskQueue().add(task.setId(++nextTaskId));
}
Copy the code

PollScheduledTask gets the task, and the execution thread must be an EventLoop thread. This method is called by subclasses, which control tasks in the consumption priority queue.

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop(a);
    // Fetch the first task in the priority queueScheduledFutureTask<? > scheduledTask = peekScheduledTask();// If it is time to execute the task, it is removed from the queue and returned
    if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
        return null;
    }
    scheduledTaskQueue.remove();
    scheduledTask.setConsumed();
    return scheduledTask;
}
Copy the code

SingleThreadEventExecutor

SingleThreadEventExecutor realized single-threaded EventExecutor the execute method, and the binding with EventLoop thread. Let’s start with member variables.

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    // Status field atomic updater
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    // Common task queue
    private final Queue<Runnable> taskQueue;
    // Bind threads
    private volatile Thread thread;
    // JDKExecutor
    private final Executor executor;
    // The maximum queue length defaults to integer.max_value
    private final int maxPendingTasks;
    / / rejection policies thrown RejectedExecutionException anomalies
    private final RejectedExecutionHandler rejectedExecutionHandler;
    // Last execution time
    private long lastExecutionTime;
    / / state
    private volatile int state = ST_NOT_STARTED;
}
Copy the code
  • SingleThreadEventExecutor maintains the common task queue, queue length is an Integer. MAX_VALUE unbounded can be thought of.
  • Perform tasks via JDKExecutor and bind threads to thread member variables.
  • The behavior of the internal control by members of the state variables SingleThreadEventExecutor.

On the one hand, SingleThreadEventExecutor maintenance task queue, provide a subclass for new tasks/query/auxiliary method, shield the internal task queue. For example, get the Task pollTask method below.

protected Runnable pollTask(a) {
    assert inEventLoop(a);
    return pollTaskFrom(taskQueue);
}

protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        Runnable task = taskQueue.poll();
        if(task ! = WAKEUP_TASK) {// A special Runnable that can be ignored for NioEventLoop
            returntask; }}}Copy the code

Another example is the hasTasks method to check whether tasks exist.

protected boolean hasTasks(a) {
    assert inEventLoop(a);
    return! taskQueue.isEmpty(); }Copy the code

SingleThreadEventExecutor, on the other hand, has realized the JDK Executor of the execute method.

@Override
public void execute(Runnable task) {
    // The second argument, which defaults to true, is ignoredexecute(task, ! (taskinstanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
  boolean inEventLoop = inEventLoop();
  // Add the task to the task queue
  addTask(task);
  if(! inEventLoop) {// If the current thread is not an EventLoop bound thread, try to start the EventLoop corresponding thread
    startThread();
    if (isShutdown()) {
      / /... Executing a rejection Policy}}/ / wake EventLoop
  / / key NioEventLoop wakeup method, awakens Java nio. Channels. Selector# wakeup
  if (!addTaskWakesUp && immediate) {
    wakeup(inEventLoop);
  }
}
Copy the code

Focus on the startThread method. Execute the doStartThread method after CAS updates the state state successfully.

private void startThread(a) {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
          doStartThread();
          // ...}}}Copy the code

Eventually doStartThread will perform with EventLoop thread (SingleThreadEventExecutor) made a binding, specific how to run method to realize consumption task queue by subclasses. The executor can be thought of as a new Thread(new Runnable()).start(), which provides a Thread factory for naming threads.

// Bind threads
private volatile Thread thread;
private void doStartThread(a) {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run(a) {
            // Set the EventLoop to a thread in the executor
            thread = Thread.currentThread();
            // run abstract method, need to subclass implementation
            SingleThreadEventExecutor.this.run();
            // ...}}); }Copy the code

So SingleThreadEventExecutor also realized the important inEventLoop methods, judge the thread (often the current thread) and binding thread.

// Bind threads
private volatile Thread thread;
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}
Copy the code

SingleThreadEventLoop

SingleThreadEventLoop implements the register abstract method that the EventLoopGroup needs to implement, and also introduces a task queue, a tail task queue.

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    // The third queue, the tail queue, has the default length integer.max_value
    private final Queue<Runnable> tailTasks;
  
    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    // Call unsafe to register
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    // Perform the tail task
    @Override
    protected void afterRunningAllTasks(a) {
        runAllTasksFrom(tailTasks);
    }
    // Override the parent hasTasks method to return false only when both the normal and tail task queues are empty
    @Override
    protected boolean hasTasks(a) {
        return super.hasTasks() || !tailTasks.isEmpty();
    }
}
Copy the code

Second, the NioEventLoop

NioEventLoop does a number of things in the run method:

  • Events that occur on Channel are polled and distributed to Unsafe for processing.
  • Perform tasks, including scheduled tasks, common tasks, and tail tasks.
  • Detect JDK NIO empty polling bug and refactor Selector.
  • Performance optimizations for JDK NIO.
public final class NioEventLoop extends SingleThreadEventLoop {
    // JDK NIO null polling BUG detection threshold. The default is 512
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
    // Is used by the SELECT policy
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get(a) throws Exception {
            returnselectNow(); }};// JDK SelectorImpl
    private Selector selector;
    // Netty SelectedSelectionKeySetSelector
    private Selector unwrappedSelector;
    // Store SelectionKey from AbstractSet
    private SelectedSelectionKeySet selectedKeys;
    // Create JDK Selector
    private final SelectorProvider provider;
    // a status flag
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
    / / select strategy
    private final SelectStrategy selectStrategy;
    // ioRatio
    private volatile int ioRatio = 50;
}
Copy the code

1. A performance optimization

NioEventLoop has a special member variable: SelectedSelectionKeySet selectedKeys, SelectedSelectionKeySet inherits AbstractSet which is a Set that stores selectionkeys.

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    SelectionKey[] keys;
    int size;
}
Copy the code

Why is there such a special Set and what does it do? The answer is found in the constructor of NioEventLoop.

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
Copy the code

Look at the openSelector method.

private SelectorTuple openSelector(a) {
    // Create JDK Selector
    final Selector unwrappedSelector;
    try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    // Enable optimization by default
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run(a) {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl".false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                returncause; }}});if(! (maybeSelectorImplClassinstanceofClass) || ! ((Class<? >) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
        }
        return new SelectorTuple(unwrappedSelector);
    }

    finalClass<? > selectorImplClass = (Class<? >) maybeSelectorImplClass;final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    // Force Netty's SelectedSelectionKeySet on JDK SelectorImpl's selectedKeys and publicSelectedKeys by reflection
    Setselectionkeyset selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet
    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run(a) {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    if(selectedKeysFieldOffset ! = -1&& publicSelectedKeysFieldOffset ! = -1) {
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                }
                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if(cause ! =null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if(cause ! =null) {
                    return cause;
                }
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                returne; }}});// If the injection fails, demote to JDK native Selector
    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        return new SelectorTuple(unwrappedSelector);
    }
    / / success injection, the use of SelectedSelectionKeySetSelector agent native Selector
    selectedKeys = selectedKeySet;
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
Copy the code

Add Netty’s own SelectedSelectionKeySet to the JDK’s SelectorImpl member variable selectedKeys. ** When the JDK Selector. Select method is executed, the corresponding SelectionKey is placed in selectedKeys. If the collection object is swapped by reflection, the element is added to Netty’s custom SelectedSelectionKeySet.

public abstract class SelectorImpl extends AbstractSelector {
    protected Set<SelectionKey> selectedKeys = new HashSet();
}
Copy the code

Replacing the original HashSet with a custom Set implementation has the advantage of Netty usage scenarios + underlying data structure optimization. Since Netty only needs to iterate over the SelectionKey and does not need hash table dependent methods such as Contains, an array is a more appropriate data structure.

On the one hand, the Add method does not have hash conflicts.

@Override
public boolean add(SelectionKey o) {
    if (o == null) {
        return false;
    }

    keys[size++] = o;
    if (size == keys.length) {
        increaseCapacity();
    }

    return true;
}
Copy the code

On the other hand, you can get the SelectionKey by traversing the array directly.

private void processSelectedKeysOptimized(a) {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // ...}}Copy the code

This is said to improve performance by 1-2%.

2, polling

The most important function of NioEventLoop is to poll a Channel for IO events and throw THE IO events to the Unsafe for processing. Break the run method into steps.

  • Perform select based on the policy
  • Processing selectionKey
  • Perform tasks according to ioRatio
protected void run(a) {
    int selectCnt = 0;
    for (;;) {
        try {
            // 1. Run the select command to execute the policy
            int strategy;
            try {
                // hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    // The execution time of the next scheduledTask
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE;
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if(! hasTasks()) {// If curDeadlineNanos=NONE blocks indefinitely on selector. Select waiting to wake upstrategy = select(curDeadlineNanos); }}finally {
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                default:}}catch (IOException e) {
                // ...
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
              	if (strategy > 0) {
                	processSelectedKeys();
              	}
              	ranTasks = runAllTasks();
            } else if (strategy > 0) {
                // select returns greater than 0
                final long ioStartTime = System.nanoTime();
              	// 2. Process selectionKey
             	  processSelectedKeys();
                // 3. Execute tasks
              	final long ioTime = System.nanoTime() - ioStartTime;
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            } else {
                // 3. Select 0 to execute the task
                ranTasks = runAllTasks(0);
            }
            if (ranTasks || strategy > 0) {
                selectCnt = 0;
            }
            // If selectCnt>512 thinks the JDKNIO empty polling bug has occurred, reconstruct the Selector
            else if (unexpectedSelectorWakeup(selectCnt)) {
                selectCnt = 0; }}catch (CancelledKeyException e) {
        } catch (Throwable t) {
            handleLoopException(t);
        }
        / /... Shut down}}Copy the code

select

Whether and how a Select is performed depends on the task queue.

First, determine how to perform select based on the policy.

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
Copy the code

DefaultSelectStrategy is the DefaultSelectStrategy implementation. If the hasTasks method returns true to indicate that a task needs to be executed, IntSupplier’s get method is used to return the strategy; otherwise, SELECT strategy is returned.

final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
    private DefaultSelectStrategy(a) {}@Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        returnhasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }}Copy the code

The hasTasks method is provided by SingleThreadEventLoop, which returns true if there is a task in the tail task queue or normal task queue, and false otherwise.

// Override the parent hasTasks method to return false only when both the normal and tail task queues are empty
@Override
protected boolean hasTasks(a) {
    return super.hasTasks() || ! tailTasks.isEmpty(); }Copy the code

The IntSupplier implementation class is NioEventLoop#selectNowSupplier, which immediately performs a select.

// Is used by the SELECT policy
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get(a) throws Exception {
        returnselectNow(); }};int selectNow(a) throws IOException {
  return selector.selectNow();
}
Copy the code

If strategy is greater than or equal to 0, it indicates that selectNow has been executed once. Other policy enumerations are divided into three types, all of which are int types less than 0:

/** * Indicates a blocking select should follow. */
int SELECT = -1;
/** * Indicates the IO loop should be retried, no blocking select to follow directly. */
int CONTINUE = -2;
/** * Indicates the IO loop to poll for new events without blocking. */
int BUSY_WAIT = -3;
Copy the code

Because CONTINUE and BUSY_WAIT are not covered by the default policy, look directly at the implementation of the SELECT policy. First nextScheduledTaskDeadlineNanos for next time task deadline time: if there is no timing task set to NONE; Otherwise, set it to the execution time of the next scheduled task.

private static final long NONE = Long.MAX_VALUE;

// The execution time of the next scheduledTask
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
	curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
	if(! hasTasks()) {// If curDeadlineNanos=NONE blocks on selector. Select waiting to wake upstrategy = select(curDeadlineNanos); }}finally {
	nextWakeupNanos.lazySet(AWAKE);
}
Copy the code

If you reconfirm that there are no tail tasks or normal tasks, the select operation is performed.

private int select(long deadlineNanos) throws IOException {
  if (deadlineNanos == NONE) { // block on select
    return selector.select();
  }
  // Calculate the execution time of the scheduled task - the current time
  long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
  return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
Copy the code

As you can see here, not every poll performs a SELECT operation. If hasTasks returns no task during the first policy judgment and hasTasks returns a task during the second SELECT policy execution, the SELECT operation is not performed.

Processing selectionKey

Regardless of how ioRatio determines the execution of the three tasks, after the select is executed, the SelectionKey needs to be processed.

if (strategy > 0) {
    processSelectedKeys();
}
Copy the code

ProcessSelectedKeys has two kinds of logic. When the selectedKeys member variable is not empty, it indicates that HashSet optimization has been done (see the previous section). Otherwise, use the native JDK processing. The general case follows the previous optimization logic.

// Store SelectionKey from AbstractSet
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys(a) {
    if(selectedKeys ! =null) {
        SelectedSelectionKeySet is successfully injected into the selectedKeys of the JDK's SelectorImpl
        processSelectedKeysOptimized();
    } else {
        // If that step fails, use selectedKeys in the native JDKprocessSelectedKeysPlain(selector.selectedKeys()); }}private void processSelectedKeysOptimized(a) {
   for (int i = 0; i < selectedKeys.size; ++i) {
     final SelectionKey k = selectedKeys.keys[i];
     selectedKeys.keys[i] = null;
     final Object a = k.attachment();

     if (a instanceof AbstractNioChannel) {
       processSelectedKey(k, (AbstractNioChannel) a);
     }
     // ...}}Copy the code

When you register a Channel with a Selector, the attachment you pass in is the Channel itself, which goes into the processSelectedKey method. Unsafe calls different methods for handling different events, which will be covered in a later section.

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if(! k.isValid()) {// ...
        return;
    }
    try {
        int readyOps = k.readyOps();
        // CONNECT event, call finishConnect
        if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        // WRITE event that writes the contents of the buffer to the corresponding Channel
        if((readyOps & SelectionKey.OP_WRITE) ! =0) {
            ch.unsafe().forceFlush();
        }
        // The READ or ACCEPT event, which calls the READ method unsafe
        if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code

Perform a task

After processing the events that occur on the selectionKey, different strategies are selected to execute the task according to the ioRatio and the number of events (strategy).

if (ioRatio == 100) {
    // ...
    ranTasks = runAllTasks();
} else if (strategy > 0) {
    // ...
  	ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} else {
    // ...
    ranTasks = runAllTasks(0);
}
Copy the code

Based on the analysis of the inheritance structure above, NioEventLoop needs to handle three types of tasks: scheduled task, normal task, and tail task.

If the ioRatio is set to 100, all tasks are executed. The runAllTasks method with no parameter periodically pulls scheduled tasks to the common task queue. After all scheduled and common tasks are executed, tail tasks are executed. However, ioRatio is set to 50 by default, which can be ignored.

protected boolean runAllTasks(a) {
    assert inEventLoop(a);
    boolean fetchedAll;
    boolean ranAtLeastOne = false;
    do {
        // 1. Pull tasks from the scheduled task queue to the common task queue
        fetchedAll = fetchFromScheduledTaskQueue();
        // 2. Perform common tasks
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true; }}while(! fetchedAll);// keep on processing until we fetched all scheduled tasks.
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    // 3. Perform the tail task
    afterRunningAllTasks();
    return ranAtLeastOne;
}
Copy the code

Strategy >0, it indicates that the event corresponding to selectionKey is activated. At this time, the task will be executed according to the ioRatio. The larger the ioRatio, the shorter the task will be executed.

Strategy =0, indicating that no events corresponding to selectionKey are activated, and tasks will be executed as few as possible, no more than 64 scheduled + ordinary tasks.

protected boolean runAllTasks(long timeoutNanos) {
    // 1. Move the scheduled task from the scheduled task queue to the common task queue
    fetchFromScheduledTaskQueue();
    // 2. Obtain a common task
    Runnable task = pollTask();
    if (task == null) {
        // If there is no task in the first two queues, execute the tail task
        afterRunningAllTasks();
        return false;
    }
    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    // 3. Execute the common task (including the scheduled task obtained at the beginning) until the deadline or the common task queue is empty
    for (;;) {
        safeExecute(task);
        runTasks ++;
        // Timeout detection is performed only once every 64 loops because nanoTime performance loss...
        if ((runTasks & 0x3F) = =0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                // Timeout terminates the loop
                break;
            }
        }
        task = pollTask();
        // The common task is completed, and the loop is closed
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break; }}// 4. Perform all tail missions
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}
Copy the code

3, solve the JDK empty polling bug

In the main polling process of the run method, there is a local variable selectCnt that represents the number of empty polling times that a Channel has no I/O events and has not executed any task. When the selectCnt reaches 512, the Selector will be triggered to rebuild. The purpose is to solve the JDK empty polling bug causing cpu100%.

protected void run(a) {
    int selectCnt = 0;
    for (;;) {
        try {
						// ...
            selectCnt++;
            // If selectCnt>512 thinks the JDKNIO empty polling bug has occurred, reconstruct the Selector
            else if (unexpectedSelectorWakeup(selectCnt)) {
                selectCnt = 0;
            }
            // ...}}private boolean unexpectedSelectorWakeup(int selectCnt) {
    if (Thread.interrupted()) {
        return true;
    }
    // selectCnt > 512 thinks a JDK empty polling BUG has occurred
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        rebuildSelector();
        return true;
    }
    return false;
}

/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector(a) {
  // EventLoop is serialized, if run is the main process, run directly
  if(! inEventLoop()) { execute(new Runnable() {
      @Override
      public void run(a) { rebuildSelector0(); }});return;
  }
  rebuildSelector0();
}
Copy the code

The rebuildSelector0 method constructs a new Selector and re-registers the Channel on the old Selector with the new Selector.

private void rebuildSelector0(a) {
    final Selector oldSelector = selector;
    // Construct a new Selector, just like in the constructor
    final SelectorTuple newSelectorTuple = openSelector();
    // Register a Channel with a new Selector
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            // Skip if the key is invalid or is already registered with a new selector
            if(! key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) ! =null) {
                continue;
            }
            // Register the key with the new Selector
            int interestOps = key.interestOps();
            key.cancel();
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceofAbstractNioChannel) { ((AbstractNioChannel) a).selectionKey = newKey; }}catch (Exception e) {
        }
    }
    // Update the selector member variable to the new selector
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
    // Close the old selector
    oldSelector.close();
}
Copy the code

conclusion

  • AbstractEventExecutor abstractions the Thread pool service AbstractExecutorService from the JDK and connects to its own Future architecture.

  • SingleThreadEventExecutor with EventLoop thread binding, the realization of lock-free serialization.

  • NioEventLoop’s parent class handles three task queues: the scheduled task queue, the normal task queue, and the tail task queue.

  • NioEventLoop provides a performance optimization for JDK selectors by injecting custom sets with reflection.

  • NioEventLoop The process for executing the event loop is select->processSelectionKey->runAllTasks, where select and runAllTasks are executed based on the policy.

  • NioEventLoop addresses the JDK’s empty polling BUG by refactoring the Selector.