preface
Netty serial design avoids thread competition. The core of the design is EventLoop, which associates each object that needs to be synchronized with a corresponding EventLoop thread to handle the tasks generated by the object. This chapter takes a closer look at EventLoop.
NioEventLoop:
- How Netty implements its Future and Promise (AbstractEventExecutor) through the JDK’s thread pool abstraction.
- There are three task queues: scheduled task queue, common task queue and tail task queue.
- How to bind with EventLoop thread (SingleThreadEventExecutor).
On the other hand, learn the duties and execution of NioEventLoop:
- Performance optimization for JDKSelector.
- Fix JDK empty polling BUG.
- Event loop execution process SELECT ->processSelectionKey->runAllTasks.
EventLoop inheritance structure
1, abstract
First, the top layer of the abstract interface is the EventExecutorGroup.
- Responsible for providing EventExecutor(s), and selecting the next EventExecutor(which is a subinterface of the EventExecutorGroup) via the Next method.
- Manages the life cycle of EventExecutor(s), providing the shutdown of all EventExecutor services.
public interface EventExecutorGroup extends ScheduledExecutorService.可迭代<EventExecutor> {
// Returns true when all EventExecutors are closed or have been closed
boolean isShuttingDown(a);
// Gracefully close all EventExecutorFuture<? > shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
// Return an EventExecutor managed by the current Group
EventExecutor next(a);
}
Copy the code
EventExecutor continues to extend the EventExecutorGroup interface
- For special EventExecutorGroups, the next method returns only itself.
- Determines whether the given thread/current thread is in the same thread as the EventLoop.
- Construct the Promise and Future instances bound to the current EventExecutor.
public interface EventExecutor extends EventExecutorGroup {
// return to itself
@Override
EventExecutor next(a);
// Determine whether the given thread/current thread is in the same thread as EventLoop
boolean inEventLoop(a);
boolean inEventLoop(Thread thread);
// Construct the Promise and Future instances bound to the current EventExecutor
<V> Promise<V> newPromise(a);
<V> ProgressivePromise<V> newProgressivePromise(a);
<V> Future<V> newSucceededFuture(V result);
<V> Future<V> newFailedFuture(Throwable cause);
}
Copy the code
The EventLoopGroup is parallel to the EventExecutor, extending the EventExecutorGroup interface, and is a special EventExecutorGroup that supports Channel registration.
public interface EventLoopGroup extends EventExecutorGroup {
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
}
Copy the code
Combining EventExecutor and EventLoopGroup, it is not difficult to see the binding relationship between Channel<->EventLoop<->Thread.
OrderedEventExecutor marks the interface and marks EventExecutor to process all tasks in order.
/**
* Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
*/
public interface OrderedEventExecutor extends EventExecutor {}Copy the code
An EventLoop can also be thought of as a tag interface that inherits the EventLoopGroup interface and registers a Channel and handles all IO operations on the Channel. An EventLoop instance tends to manage multiple channels.
/**
* Will handle all the I/O operations for a {@link Channel} once registered.
* One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on implementation details and internals.
*/
public interface EventLoop extends OrderedEventExecutor.EventLoopGroup {
@Override
EventLoopGroup parent(a);
}
Copy the code
2, implementation,
AbstractEventExecutor
AbstractEventExecutor is a top-level abstract class (minus the JDK) that implements some general methods, some of which are posted here.
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private final EventExecutorGroup parent;
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
@Override
public EventExecutorGroup parent(a) {
return parent;
}
@Override
public EventExecutor next(a) {
return this;
}
@Override
public boolean inEventLoop(a) {
return inEventLoop(Thread.currentThread());
}
@Override
publicFuture<? > shutdownGracefully() {return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
@Override
public <V> Promise<V> newPromise(a) {
return new DefaultPromise<V>(this); }}Copy the code
AbstractEventExecutor also inherits the JDK’s Abstractions AbstractExecutorService thread pool class to use the JDK’s abstraction of the thread pool service, as well as its own Future&Promise architecture. The submit method ADAPTS the JDK’s Submit method and returns the NettyFuture inherited from JDKFuture. The newTaskFor method, which in the JDK returns FutureTask, is Netty’s own PromiseTask.
// The subclass AbstractExecutorService interface defines the Future of the JDK;
// The interface definition returns the Future of Netty (because it is a subclass of JDKFuture).
@Override
public <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
}
// rewrite the JDK AbstractExecutorService#newTaskFor method to access Netty's own PromiseTask
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new PromiseTask<T>(this, runnable, value);
}
Copy the code
PromiseTask
PromiseTask, which inherits Netty’s DefaultPromise and implements THE JDK’s RunnableFuture, is an adaptation class.
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
// A Callable or a Runnable
private Object task;
}
Copy the code
On the one hand, PromiseTask disables all methods for setting Promise outcomes.
@Override
public final Promise<V> setFailure(Throwable cause) {
throw new IllegalStateException();
}
@Override
public final Promise<V> setSuccess(V result) {
throw new IllegalStateException();
}
@Override
public final boolean tryFailure(Throwable cause) {
return false;
}
// ...
Copy the code
On the other hand, implementing the JDK’s RunnableFuture ensures that Promise results can only be set when the task is actually executed, in order to commit to the JDK’s thread pool abstraction service.
@Override
public void run(a) {
try {
// super.setUncancellable Promise setting cannot be cancelled
if (setUncancellableInternal()) {
// Execute the task
V result = runTask();
// super.setSuccess Promise setting executed successfullysetSuccessInternal(result); }}catch (Throwable e) {
// super.setFailure Promise setting failssetFailureInternal(e); }}final V runTask(a) throws Exception {
final Object task = this.task;
if (task instanceof Callable) {
return ((Callable<V>) task).call();
}
((Runnable) task).run();
return null;
}
Copy the code
AbstractScheduledEventExecutor
AbstractScheduledEventExecutor on the basis of AbstractEventExecutor has realized the timing delay/task execution.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
// Priority queue, which stores delayed tasks, arranged in ascending order according to the next execution time of the tasksPriorityQueue<ScheduledFutureTask<? >> scheduledTaskQueue;// Next task id
long nextTaskId;
}
Copy the code
ScheduledTaskQueue if it is the current EventLoop thread. Otherwise, the execute method implemented by subclasses is called.
@Override
publicScheduledFuture<? > schedule(Runnable command,long delay, TimeUnit unit) {
return schedule(new ScheduledFutureTask<Void>(
this,
command,
deadlineNanos(unit.toNanos(delay))));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {
// ...
execute(task);
}
return task;
}
final void scheduleFromEventLoop(finalScheduledFutureTask<? > task) {
scheduledTaskQueue().add(task.setId(++nextTaskId));
}
Copy the code
PollScheduledTask gets the task, and the execution thread must be an EventLoop thread. This method is called by subclasses, which control tasks in the consumption priority queue.
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop(a);
// Fetch the first task in the priority queueScheduledFutureTask<? > scheduledTask = peekScheduledTask();// If it is time to execute the task, it is removed from the queue and returned
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
return null;
}
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}
Copy the code
SingleThreadEventExecutor
SingleThreadEventExecutor realized single-threaded EventExecutor the execute method, and the binding with EventLoop thread. Let’s start with member variables.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// Status field atomic updater
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
// Common task queue
private final Queue<Runnable> taskQueue;
// Bind threads
private volatile Thread thread;
// JDKExecutor
private final Executor executor;
// The maximum queue length defaults to integer.max_value
private final int maxPendingTasks;
/ / rejection policies thrown RejectedExecutionException anomalies
private final RejectedExecutionHandler rejectedExecutionHandler;
// Last execution time
private long lastExecutionTime;
/ / state
private volatile int state = ST_NOT_STARTED;
}
Copy the code
- SingleThreadEventExecutor maintains the common task queue, queue length is an Integer. MAX_VALUE unbounded can be thought of.
- Perform tasks via JDKExecutor and bind threads to thread member variables.
- The behavior of the internal control by members of the state variables SingleThreadEventExecutor.
On the one hand, SingleThreadEventExecutor maintenance task queue, provide a subclass for new tasks/query/auxiliary method, shield the internal task queue. For example, get the Task pollTask method below.
protected Runnable pollTask(a) {
assert inEventLoop(a);
return pollTaskFrom(taskQueue);
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if(task ! = WAKEUP_TASK) {// A special Runnable that can be ignored for NioEventLoop
returntask; }}}Copy the code
Another example is the hasTasks method to check whether tasks exist.
protected boolean hasTasks(a) {
assert inEventLoop(a);
return! taskQueue.isEmpty(); }Copy the code
SingleThreadEventExecutor, on the other hand, has realized the JDK Executor of the execute method.
@Override
public void execute(Runnable task) {
// The second argument, which defaults to true, is ignoredexecute(task, ! (taskinstanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
// Add the task to the task queue
addTask(task);
if(! inEventLoop) {// If the current thread is not an EventLoop bound thread, try to start the EventLoop corresponding thread
startThread();
if (isShutdown()) {
/ /... Executing a rejection Policy}}/ / wake EventLoop
/ / key NioEventLoop wakeup method, awakens Java nio. Channels. Selector# wakeup
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
Copy the code
Focus on the startThread method. Execute the doStartThread method after CAS updates the state state successfully.
private void startThread(a) {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
// ...}}}Copy the code
Eventually doStartThread will perform with EventLoop thread (SingleThreadEventExecutor) made a binding, specific how to run method to realize consumption task queue by subclasses. The executor can be thought of as a new Thread(new Runnable()).start(), which provides a Thread factory for naming threads.
// Bind threads
private volatile Thread thread;
private void doStartThread(a) {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run(a) {
// Set the EventLoop to a thread in the executor
thread = Thread.currentThread();
// run abstract method, need to subclass implementation
SingleThreadEventExecutor.this.run();
// ...}}); }Copy the code
So SingleThreadEventExecutor also realized the important inEventLoop methods, judge the thread (often the current thread) and binding thread.
// Bind threads
private volatile Thread thread;
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
Copy the code
SingleThreadEventLoop
SingleThreadEventLoop implements the register abstract method that the EventLoopGroup needs to implement, and also introduces a task queue, a tail task queue.
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
// The third queue, the tail queue, has the default length integer.max_value
private final Queue<Runnable> tailTasks;
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
// Call unsafe to register
@Override
public ChannelFuture register(final ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
// Perform the tail task
@Override
protected void afterRunningAllTasks(a) {
runAllTasksFrom(tailTasks);
}
// Override the parent hasTasks method to return false only when both the normal and tail task queues are empty
@Override
protected boolean hasTasks(a) {
return super.hasTasks() || !tailTasks.isEmpty();
}
}
Copy the code
Second, the NioEventLoop
NioEventLoop does a number of things in the run method:
- Events that occur on Channel are polled and distributed to Unsafe for processing.
- Perform tasks, including scheduled tasks, common tasks, and tail tasks.
- Detect JDK NIO empty polling bug and refactor Selector.
- Performance optimizations for JDK NIO.
public final class NioEventLoop extends SingleThreadEventLoop {
// JDK NIO null polling BUG detection threshold. The default is 512
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
// Is used by the SELECT policy
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get(a) throws Exception {
returnselectNow(); }};// JDK SelectorImpl
private Selector selector;
// Netty SelectedSelectionKeySetSelector
private Selector unwrappedSelector;
// Store SelectionKey from AbstractSet
private SelectedSelectionKeySet selectedKeys;
// Create JDK Selector
private final SelectorProvider provider;
// a status flag
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
/ / select strategy
private final SelectStrategy selectStrategy;
// ioRatio
private volatile int ioRatio = 50;
}
Copy the code
1. A performance optimization
NioEventLoop has a special member variable: SelectedSelectionKeySet selectedKeys, SelectedSelectionKeySet inherits AbstractSet which is a Set that stores selectionkeys.
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
}
Copy the code
Why is there such a special Set and what does it do? The answer is found in the constructor of NioEventLoop.
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
Copy the code
Look at the openSelector method.
private SelectorTuple openSelector(a) {
// Create JDK Selector
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// Enable optimization by default
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run(a) {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl".false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
returncause; }}});if(! (maybeSelectorImplClassinstanceofClass) || ! ((Class<? >) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
}
return new SelectorTuple(unwrappedSelector);
}
finalClass<? > selectorImplClass = (Class<? >) maybeSelectorImplClass;final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// Force Netty's SelectedSelectionKeySet on JDK SelectorImpl's selectedKeys and publicSelectedKeys by reflection
Setselectionkeyset selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet selectionKeySet
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run(a) {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if(selectedKeysFieldOffset ! = -1&& publicSelectedKeysFieldOffset ! = -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if(cause ! =null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if(cause ! =null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
returne; }}});// If the injection fails, demote to JDK native Selector
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
return new SelectorTuple(unwrappedSelector);
}
/ / success injection, the use of SelectedSelectionKeySetSelector agent native Selector
selectedKeys = selectedKeySet;
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
Copy the code
Add Netty’s own SelectedSelectionKeySet to the JDK’s SelectorImpl member variable selectedKeys. ** When the JDK Selector. Select method is executed, the corresponding SelectionKey is placed in selectedKeys. If the collection object is swapped by reflection, the element is added to Netty’s custom SelectedSelectionKeySet.
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
}
Copy the code
Replacing the original HashSet with a custom Set implementation has the advantage of Netty usage scenarios + underlying data structure optimization. Since Netty only needs to iterate over the SelectionKey and does not need hash table dependent methods such as Contains, an array is a more appropriate data structure.
On the one hand, the Add method does not have hash conflicts.
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
Copy the code
On the other hand, you can get the SelectionKey by traversing the array directly.
private void processSelectedKeysOptimized(a) {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// ...}}Copy the code
This is said to improve performance by 1-2%.
2, polling
The most important function of NioEventLoop is to poll a Channel for IO events and throw THE IO events to the Unsafe for processing. Break the run method into steps.
- Perform select based on the policy
- Processing selectionKey
- Perform tasks according to ioRatio
protected void run(a) {
int selectCnt = 0;
for (;;) {
try {
// 1. Run the select command to execute the policy
int strategy;
try {
// hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// The execution time of the next scheduledTask
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if(! hasTasks()) {// If curDeadlineNanos=NONE blocks indefinitely on selector. Select waiting to wake upstrategy = select(curDeadlineNanos); }}finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:}}catch (IOException e) {
// ...
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
if (strategy > 0) {
processSelectedKeys();
}
ranTasks = runAllTasks();
} else if (strategy > 0) {
// select returns greater than 0
final long ioStartTime = System.nanoTime();
// 2. Process selectionKey
processSelectedKeys();
// 3. Execute tasks
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} else {
// 3. Select 0 to execute the task
ranTasks = runAllTasks(0);
}
if (ranTasks || strategy > 0) {
selectCnt = 0;
}
// If selectCnt>512 thinks the JDKNIO empty polling bug has occurred, reconstruct the Selector
else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0; }}catch (CancelledKeyException e) {
} catch (Throwable t) {
handleLoopException(t);
}
/ /... Shut down}}Copy the code
select
Whether and how a Select is performed depends on the task queue.
First, determine how to perform select based on the policy.
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
Copy the code
DefaultSelectStrategy is the DefaultSelectStrategy implementation. If the hasTasks method returns true to indicate that a task needs to be executed, IntSupplier’s get method is used to return the strategy; otherwise, SELECT strategy is returned.
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy(a) {}@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
returnhasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }}Copy the code
The hasTasks method is provided by SingleThreadEventLoop, which returns true if there is a task in the tail task queue or normal task queue, and false otherwise.
// Override the parent hasTasks method to return false only when both the normal and tail task queues are empty
@Override
protected boolean hasTasks(a) {
return super.hasTasks() || ! tailTasks.isEmpty(); }Copy the code
The IntSupplier implementation class is NioEventLoop#selectNowSupplier, which immediately performs a select.
// Is used by the SELECT policy
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get(a) throws Exception {
returnselectNow(); }};int selectNow(a) throws IOException {
return selector.selectNow();
}
Copy the code
If strategy is greater than or equal to 0, it indicates that selectNow has been executed once. Other policy enumerations are divided into three types, all of which are int types less than 0:
/** * Indicates a blocking select should follow. */
int SELECT = -1;
/** * Indicates the IO loop should be retried, no blocking select to follow directly. */
int CONTINUE = -2;
/** * Indicates the IO loop to poll for new events without blocking. */
int BUSY_WAIT = -3;
Copy the code
Because CONTINUE and BUSY_WAIT are not covered by the default policy, look directly at the implementation of the SELECT policy. First nextScheduledTaskDeadlineNanos for next time task deadline time: if there is no timing task set to NONE; Otherwise, set it to the execution time of the next scheduled task.
private static final long NONE = Long.MAX_VALUE;
// The execution time of the next scheduledTask
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if(! hasTasks()) {// If curDeadlineNanos=NONE blocks on selector. Select waiting to wake upstrategy = select(curDeadlineNanos); }}finally {
nextWakeupNanos.lazySet(AWAKE);
}
Copy the code
If you reconfirm that there are no tail tasks or normal tasks, the select operation is performed.
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) { // block on select
return selector.select();
}
// Calculate the execution time of the scheduled task - the current time
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
Copy the code
As you can see here, not every poll performs a SELECT operation. If hasTasks returns no task during the first policy judgment and hasTasks returns a task during the second SELECT policy execution, the SELECT operation is not performed.
Processing selectionKey
Regardless of how ioRatio determines the execution of the three tasks, after the select is executed, the SelectionKey needs to be processed.
if (strategy > 0) {
processSelectedKeys();
}
Copy the code
ProcessSelectedKeys has two kinds of logic. When the selectedKeys member variable is not empty, it indicates that HashSet optimization has been done (see the previous section). Otherwise, use the native JDK processing. The general case follows the previous optimization logic.
// Store SelectionKey from AbstractSet
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys(a) {
if(selectedKeys ! =null) {
SelectedSelectionKeySet is successfully injected into the selectedKeys of the JDK's SelectorImpl
processSelectedKeysOptimized();
} else {
// If that step fails, use selectedKeys in the native JDKprocessSelectedKeysPlain(selector.selectedKeys()); }}private void processSelectedKeysOptimized(a) {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
// ...}}Copy the code
When you register a Channel with a Selector, the attachment you pass in is the Channel itself, which goes into the processSelectedKey method. Unsafe calls different methods for handling different events, which will be covered in a later section.
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if(! k.isValid()) {// ...
return;
}
try {
int readyOps = k.readyOps();
// CONNECT event, call finishConnect
if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// WRITE event that writes the contents of the buffer to the corresponding Channel
if((readyOps & SelectionKey.OP_WRITE) ! =0) {
ch.unsafe().forceFlush();
}
// The READ or ACCEPT event, which calls the READ method unsafe
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code
Perform a task
After processing the events that occur on the selectionKey, different strategies are selected to execute the task according to the ioRatio and the number of events (strategy).
if (ioRatio == 100) {
// ...
ranTasks = runAllTasks();
} else if (strategy > 0) {
// ...
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} else {
// ...
ranTasks = runAllTasks(0);
}
Copy the code
Based on the analysis of the inheritance structure above, NioEventLoop needs to handle three types of tasks: scheduled task, normal task, and tail task.
If the ioRatio is set to 100, all tasks are executed. The runAllTasks method with no parameter periodically pulls scheduled tasks to the common task queue. After all scheduled and common tasks are executed, tail tasks are executed. However, ioRatio is set to 50 by default, which can be ignored.
protected boolean runAllTasks(a) {
assert inEventLoop(a);
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 1. Pull tasks from the scheduled task queue to the common task queue
fetchedAll = fetchFromScheduledTaskQueue();
// 2. Perform common tasks
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true; }}while(! fetchedAll);// keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 3. Perform the tail task
afterRunningAllTasks();
return ranAtLeastOne;
}
Copy the code
Strategy >0, it indicates that the event corresponding to selectionKey is activated. At this time, the task will be executed according to the ioRatio. The larger the ioRatio, the shorter the task will be executed.
Strategy =0, indicating that no events corresponding to selectionKey are activated, and tasks will be executed as few as possible, no more than 64 scheduled + ordinary tasks.
protected boolean runAllTasks(long timeoutNanos) {
// 1. Move the scheduled task from the scheduled task queue to the common task queue
fetchFromScheduledTaskQueue();
// 2. Obtain a common task
Runnable task = pollTask();
if (task == null) {
// If there is no task in the first two queues, execute the tail task
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
// 3. Execute the common task (including the scheduled task obtained at the beginning) until the deadline or the common task queue is empty
for (;;) {
safeExecute(task);
runTasks ++;
// Timeout detection is performed only once every 64 loops because nanoTime performance loss...
if ((runTasks & 0x3F) = =0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
// Timeout terminates the loop
break;
}
}
task = pollTask();
// The common task is completed, and the loop is closed
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break; }}// 4. Perform all tail missions
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
Copy the code
3, solve the JDK empty polling bug
In the main polling process of the run method, there is a local variable selectCnt that represents the number of empty polling times that a Channel has no I/O events and has not executed any task. When the selectCnt reaches 512, the Selector will be triggered to rebuild. The purpose is to solve the JDK empty polling bug causing cpu100%.
protected void run(a) {
int selectCnt = 0;
for (;;) {
try {
// ...
selectCnt++;
// If selectCnt>512 thinks the JDKNIO empty polling bug has occurred, reconstruct the Selector
else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
// ...}}private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
return true;
}
// selectCnt > 512 thinks a JDK empty polling BUG has occurred
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
return true;
}
return false;
}
/**
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector(a) {
// EventLoop is serialized, if run is the main process, run directly
if(! inEventLoop()) { execute(new Runnable() {
@Override
public void run(a) { rebuildSelector0(); }});return;
}
rebuildSelector0();
}
Copy the code
The rebuildSelector0 method constructs a new Selector and re-registers the Channel on the old Selector with the new Selector.
private void rebuildSelector0(a) {
final Selector oldSelector = selector;
// Construct a new Selector, just like in the constructor
final SelectorTuple newSelectorTuple = openSelector();
// Register a Channel with a new Selector
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
// Skip if the key is invalid or is already registered with a new selector
if(! key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) ! =null) {
continue;
}
// Register the key with the new Selector
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceofAbstractNioChannel) { ((AbstractNioChannel) a).selectionKey = newKey; }}catch (Exception e) {
}
}
// Update the selector member variable to the new selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
// Close the old selector
oldSelector.close();
}
Copy the code
conclusion
-
AbstractEventExecutor abstractions the Thread pool service AbstractExecutorService from the JDK and connects to its own Future architecture.
-
SingleThreadEventExecutor with EventLoop thread binding, the realization of lock-free serialization.
-
NioEventLoop’s parent class handles three task queues: the scheduled task queue, the normal task queue, and the tail task queue.
-
NioEventLoop provides a performance optimization for JDK selectors by injecting custom sets with reflection.
-
NioEventLoop The process for executing the event loop is select->processSelectionKey->runAllTasks, where select and runAllTasks are executed based on the policy.
-
NioEventLoop addresses the JDK’s empty polling BUG by refactoring the Selector.