Writing in the front
Welcome to add personal wechat dyingGQ to communicate and learn together ~~
In the last part, we split the first part of Netty’s initAndRegister() method into three parts
- channel = channelFactory.newChannel();
- init(channel);
- ChannelFuture regFuture = config().group().register(channel);
There’s something missing in the second little bit of init(channel), so we’re going to continue with init.
InitAndRegister init (channel)
The previous article concluded:
Init initializes the NioServerSocketChannel properties, including options and pipeline initialization.
In the init method of ServerBootstrap, adding ChannelInitializer for pipeline ignores a detail, ch.eventLoop().execute
ChannelPipeline p = channel.pipeline(); // The channel here is NioServerSocketChannel
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code
For the ch.eventLoop().execute() line, ch.eventloop () gets NioEventLoop and nioEventLoop.execute () executes its parent class SingleThreadEventExecutor implement the execute method
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if(! inEventLoop) { startThread();if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true; }}catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if(reject) { reject(); }}}if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
Copy the code
Here are two core methods:
addTask(task);
startThread();
Copy the code
1. The first lookaddTask(task);
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
Copy the code
It is known by means of breakpoint tracingpublic void initChannel(final Channel ch)
isinitAndRegister()
In the method, the first passchannel = channelFactory.newChannel();
The initialization of theNioServerSocketChannel
2. Take a look atstartThread();
methods
private void startThread(a) {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); }}}}Copy the code
Look directly at the core logic doStartThread()
private void doStartThread(a) {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run(a) {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break; }}// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates."); }}try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break; }}}finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if(! taskQueue.isEmpty()) {if (logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ') ');
}
}
terminationFuture.setSuccess(null); }}}}}); }Copy the code
Here the core logic for: SingleThreadEventExecutor. Enclosing the run (); Going back to the implementation method, this is obviously NioEventLoop
NioEventLoop
The run() source code is as follows
@Override
protected void run(a) {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...) '. (BAD)
// 2) Selector is waken up between 'selector.select(...) ' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...) ' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...) ' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...) .
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:}}catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.runAllTasks(); }}else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}}catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return; }}}catch(Throwable t) { handleLoopException(t); }}}Copy the code
There’s an infinite loop here to execute the logicLet’s focus on that hereselect(wakenUp.getAndSet(false));
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
Copy the code
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false.true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if(selectedKeys ! =0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector); }}}catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway}}Copy the code
For (;;) is also used in the select() method Logic is executed in an infinite loop. Unlike the run() method, the loop does not break and select() has a break break condition.
Again, let’s look at the key logic:
int selectedKeys = selector.select(timeoutMillis);
Copy the code
So here’s the Nio code, doing a timeout blocking operation, getting the selectedKeys or until the timeout.
If an event is obtained, the loop is broken
if(selectedKeys ! =0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
Copy the code
1. Loop for(;;) in the run() method. The execution starts with the switch select()
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
Copy the code
Switch select(); Loop. The normal process will timeout and block until the SELECT event selectors. Select (timeoutMillis) is obtained. Typically, the loop waits for the event to be fetched or until it times out, returning the main loop logic of the run() method for the subsequent method to execute. So far, this has been a big loop inside a small loop, listening for events.
Breaking out of the Switch select() loop will execute the core logic that follows
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.runAllTasks(); }}else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}Copy the code
So you can see that it’s going to end up in two main logics anyway, but let’s focus on these two core logics here, okay
processSelectedKeys();
runAllTasks();
Copy the code
First process the selected keys method processSelectedKeys()
private void processSelectedKeys(a) {
if(selectedKeys ! =null) {
processSelectedKeysOptimized();
} else{ processSelectedKeysPlain(selector.selectedKeys()); }}Copy the code
Through the debug breakpoints can see at the time of startup NettyServer selectedKeys isn’t empty But size = 0 so will enter method processSelectedKeysOptimized ()
Let’s go through both:
private void processSelectedKeysOptimized(a) {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1; }}}Copy the code
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if(! i.hasNext()) {break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else{ i = selectedKeys.iterator(); }}}}Copy the code
As you can see, it all ends up in a method like processSelectedKey(), which has two implementations:
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if(! k.isValid()) {final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if(eventLoop ! =this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if((readyOps & SelectionKey.OP_CONNECT) ! =0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if((readyOps & SelectionKey.OP_WRITE) ! =0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if(! k.isValid()) {// Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break; }}}Copy the code
We’re going to focus on method 1 here. It can be seen that the main logic of this method is to determine whether it is the corresponding event through the bit and operation so as to carry out the corresponding operation
if((readyOps & SelectionKey.OP_CONNECT) ! =0)
if((readyOps & SelectionKey.OP_WRITE) ! =0)
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0)
Copy the code
Final call several of the unsafe method final AbstractNioChannel. NioUnsafe unsafe = ch. Unsafe ();
unsafe.finishConnect();
ch.unsafe().forceFlush();
unsafe.read();
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
*/
SelectableChannel ch(a);
/** * Finish connect */
void finishConnect(a);
/**
* Read from underlying {@link SelectableChannel}
*/
void read(a);
void forceFlush(a);
}
Copy the code
The first thing to remember in this block is a couple of unsafe inner classes
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe
protected class NioByteUnsafe extends AbstractNioUnsafe
private final class NioMessageUnsafe extends AbstractNioUnsafe
Copy the code
NioByteUnsafe AbstractNioUnsafe for AbstractChannel inner class for NioMessageUnsafe AbstractNioMessageChannel inner class AbstractNioMessageChannel inner class
These methods are still more important, here is an impression, temporarily press the table, we continue to go to the following process
hereprocessSelectedKeys();
The first two steps of the loop below are formedNioEventLoop provides Netty with such a run method loop, the bloody loop core three steps are:
- select(wakenUp.getAndSet(false));
- processSelectedKeys();
- runAllTasks();
The third step runAllTasks ();
Well, from the name, it’s a mission.
protected boolean runAllTasks(a) {
assert inEventLoop(a);
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true; }}while(! fetchedAll);// keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
Copy the code
So here we’re looking at runAllTasksFrom(taskQueue)
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true; }}}Copy the code
Here the task is pulled from the taskQueue in the previous NioEventLoop for execution
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
returntask; }}Copy the code
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t); }}Copy the code
At this point, the taskQueue does not have any tasks. We will explain when there are tasks in a later article.
Here startThread (); We’re done with our main logic.
conclusion
This article is a bit long as a middle part, so let’s just summarize. This article mainly introduces the execution logic in Netty’s custom Executor, which is also the key logic for Netty to process tasks.
Core classes: SingleThreadEventExecutor core method:
// Overwrite the submit task method
public void execute(Runnable task)
protected void addTask(Runnable task)
private void startThread(a)
private void doStartThread(a)
protected abstract void run(a);
protected boolean runAllTasks(a)
Copy the code
And for NioEventLoop SingleThreadEventExecutor run () the realization of the abstract methods
protected void run(a)
private void processSelectedKeys(a)
private void select(boolean oldWakenUp)
Copy the code
The most important is the NioEventLoop implementation of the run() method called in the doStartThread() method
SingleThreadEventExecutor.this.run();
Copy the code
The run() method has a loop trilogy
- Select Timeout blocks event selection
- ProcessSelectedKeys Unsafe Event handling
- RunAllTasks Executes all tasks in the taskQueue
Finally, in fact, there is a very important point I don’t know if you have noticed, here I put the source code:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code
In init, ch.eventloop ().execute doesn’t actually execute, it just overwrites the declaration. There is direct execution in the register below.
After the introduction above, I believe you have a specific understanding of the above picture. The next article will begin the register journey. Thanks for reading this and hope you found it helpful.
I am aDying strandedI am always looking forward to meeting you. Whether you expect it or not, tides come and go, I’m just here…
See you next time