Netty source code interpretation
EventLoop source
1 Instantiate the NioEventLoopGroup
We discarded the service code directly using a no-parameter constructor instantiated, so let’s start from the constructor look, see io.net ty. Channel. Nio. NioEventLoopGroup# NioEventLoopGroup ()
public NioEventLoopGroup(a) {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// Look at the place where you create a SelectorProvider and you're thinking of a Selector in Java NIO
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Copy the code
Go to super and see what else you did. See: io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object…)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// Look at this place, if you use the no-parameter constructor this place will pass in 0, but the final number of threads is DEFAULT_EVENT_LOOP_THREADS
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code
You can see if the default number of threads is DEFAULT_EVENT_LOOP_THREADS, what is this number? Click on the definition to see that it is 2 times the number of computer cores: Runtime.getruntime ().availableProcessors() * 2. Continue to follow up in super, See io.net ty. Util. Concurrent. MultithreadEventExecutorGroup# MultithreadEventExecutorGroup (int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…)
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// Instantiate executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
/ / EventExecutor array
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// Instantiate the members of the array
children[i] = newChild(executor, args);
success = true; }... }// Create a selector for the evetLoop. If there are multiple Eventloops, the Channel selects which eventLoop thread to run on.
chooser = chooserFactory.newChooser(children);
}
Copy the code
We continue to follow up the newChild method, see, io.net ty. Channel. Nio. NioEventLoopGroup# newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// This is where an eventLoop is instantiated. Let's follow up with the constructor
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Copy the code
Follow up the constructor, see, io.net ty. Channel. Nio. NioEventLoop# NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// We will not follow up with the super, you can think of the executor as a member variable for the eventLoop to start a thread later
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// If we look at this, we can see that the Java NIO selector is created here and becomes a member variable of the eventLoop
selector = openSelector();
selectStrategy = strategy;
}
Copy the code
Create an Array of EventLoops twice the size of the number of computer cores and instantiate each eventLoop. Essentially, create the member variables of the eventLoop: Executor and Selector
2 Execution logic of eventLoop
Remember we did in the above section will be in after creating the ServerSocketChannel will be registered on one of the eventLoop, see io.net ty. Channel. AbstractChannel. AbstractUnsafe# register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {... AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// At this point the eventLoop starts executing and we follow it
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }}); }catch(Throwable t) { ... }}}Copy the code
We follow the execute method, see io.net ty. Util. Concurrent. SingleThreadEventExecutor# execute
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// Look at this, since we are currently in the main thread, execute this method to start a new thread
startThread();
addTask(task);
if(isShutdown() && removeTask(task)) { reject(); }}if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
// This method is called
private void startThread(a) {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
// Follow updoStartThread(); }}}// Adjust here
private void doStartThread(a) {
assert thread == null;
// Finally, a new thread is started with this ThreadPerTaskExecutor
executor.execute(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run(a) {
// Save the current thread in the current eventLoop thread
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// What's the matter with this place
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally{... }}}); }Copy the code
We continue to follow up the SingleThreadEventExecutor. This. The run (); See io.net ty. Channel. Nio. NioEventLoop# run
@Override
protected void run(a) {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// (1) This select looks familiar. Think back to Java NIO
select(wakenUp.getAndSet(false)); .if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// (2) Does this look familiar?
processSelectedKeys();
} finally {
// (3) What do we do hererunAllTasks(); }}... }}Copy the code
(1) to select
We follow this approach see io.net ty. Channel. Nio. NioEventLoop# select
private void select(boolean oldWakenUp) throws IOException {
// Get the Java NIO selector
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
// The for loop keeps polling to see if it has time to happen
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false.true)) {
selector.selectNow();
selectCnt = 1;
break;
}
intselectedKeys = selector.select(timeoutMillis); . }}}Copy the code
So in the code above we’re just going to focus on whether Netty is actually calling the Java NIO selector to start polling at some point in time. The reason why it is written more complex is to avoid a well-known empty polling bug, that we do not discuss here, can be set up a separate topic to talk about later.
(2) processSelectedKeys ()
Once happened we can through processSelectedKeys () event handling, follow up see: io.net ty. Channel. Nio. NioEventLoop# processSelectedKeys
private void processSelectedKeys(a) {
// Some of you might wonder why the selectedKeys are not selected by Neety alone, but Netty has made some optimisationsfor this selector's selectedKeys, and we're not going to go into that, but you can make a separate topic and say, Let's do the process first.
if(selectedKeys ! =null) {
// Execute here
processSelectedKeysOptimized(selectedKeys.flip());
} else{ processSelectedKeysPlain(selector.selectedKeys()); }}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
// This place is the same as Java NIO in order to handle events that occur
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// since we created NioServerSocketChannel we are mainly looking at this to handle events
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }... }}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// Get the unsafe member variable in the current AbstractNioChannel
finalAbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); .try {
// The following code is familiar again? Java NIO code for handling various events
int readyOps = k.readyOps();
if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if((readyOps & SelectionKey.OP_WRITE) ! =0) {
ch.unsafe().forceFlush();
}
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
// Do we remember what we did in Java NIO? Create SocketChannel and register read events. We'll discuss this in the next chapter, how does Netty handle new connection access
unsafe.read();
if(! ch.isOpen()) {return; }}}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code
So now that the implementation of eventLoop is over, let’s review the previous chapter and combine it with Java NIO and think about what we did. So we create a NioServerSocketChannel, and then we turn on the selector polling event, and then we register the OP_ACCEPT event on the selector and that’s the same process that we wrote in Java NIO, right? Doesn’t that make it easier to remember?