An overview of the
It took another three days or so to understand the whole framework of the Reactor model. It was fucking hard to learn.
Relationship between EventLoopGroup and EventLoop
Take a look at the inheritance of EventLoop and EventLoopGroup, and some key methods and objectsThere are several key pieces of information to note
- NioEventLoopGroup contains a Childern array that stores nioEventLoops
2. NioEventLoopGroup submit,execute and other thread pool methods actually select an EventLoop from Childern to execute
3. NioEventLoop is a single-threaded actuator. To hold aSelector
, the thread just iterates over the Selector, handles the event on that Selector, and thenSee if there are any other tasks in the queue that need to be executed
, such as a new Channel registration.The diagram above shows the code in action below
The source code to read
This source analysis is mainly based on the above figure to find the corresponding source
EventLoopGroup
Research on EventLoopGroup mainly starts from several aspects
- Loop array creation process
- How are tasks assigned to the internal loop after being submitted to the group
Loop array creation process
There are a lot of eventLoops in the EventLoopGroupNioEventLoopGroup newChild is implemented in the NioEventLoopGroup class, so all arrays are nioEventLoops
How are tasks assigned to the internal loop after being submitted to the group
The execute method is called when we’re looking for a thread pool to submit a task. Let’s look at the EventLoopGroup source code call
@Override
public void execute(Runnable command) {
next().execute(command);
}
Copy the code
Call an abstract method and perform the execute, the realization of the abstract methods in MultithreadEventexecutorGroup
@Override
public EventExecutor next(a) {
return chooser.next();
}
Copy the code
This Chooser is already initialized in the constructor and contains the entire EventLoop array
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {# check the length of the array2Multiple of PI, if it is, use another onechoose
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return newGenericEventExecutorChooser(executors); }}private static boolean isPowerOfTwo(int val) {# this bit operation is to determine whether2The number of powerreturn(val & -val) == val; } # these are all round arrays, just to increase efficiencyprivate static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next(a) {
return executors[idx.getAndIncrement() & executors.length - 1]; }}private static final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next(a) {
returnexecutors[Math.abs(idx.getAndIncrement() % executors.length)]; }}Copy the code
EventLoopGroup summary
So we know that an EventLoopGroup is a bunch of EventLoopgroups, and then somebody calls the thread pool method, and they find an EventLoop in the array and throw it to them to execute. So the next logical focus is EventLoop
EventLoop
EventLoop is designed to understand the architecture again, but other details, such as channels, promises, etc., will be covered in other chapters.
- EventLoop listens for the event’s code logic for handling the event
- What is the process by which BossGroup registers new connections to WorkerGroup
EventLoop listens for the event’s code logic for handling the event
Before we start, let’s look at what happens when an EventLoop is created. Recall that the EventLoop creation is implemented in the newChild method of the NioEventLoopGroup constructor
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
Copy the code
Here you can see that you put a provider and a selector, and then call the constructor of the parent class
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
Copy the code
The parent class creates a LinkedBlockingQueue and then calls the parent class’s constructor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
Copy the code
You can see that the parent class holds an executor of ThreadPerTaskExecutor and then a LinkedBlockingQueue. Call the parent class to save which EventLoopGroup you belong to. At this point, we know what’s inside a NioEventLoop
To understand the logic for handling event notifications, start withexecute
Methods to obtain, in SingleThreadEventExecutor class implements the execute method
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task"); } # check whether the EventLoop thread called this methodboolean inEventLoop = inEventLoop();
if(inEventLoop) {# If yes, add the task to the queue. }else{# otherwise start an EventLoop thread startThread(); Queue addTask(task);if(isShutdown() && removeTask(task)) { reject(); }}if(! addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }} # add task to queueprotected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
Copy the code
If the method is not called from an Executor thread, then try to open the thread pool
private void startThread(a) {# note that if the thread pool is not opened, it will not be opened, so this is a single thread poolif (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); }}}} # I deleted a large number of statements irrelevant to this logicprivate void doStartThread(a) {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run(a) Thread = thread.currentThread (); # perform abstract methods run SingleThreadEventExecutor.this.run(); }}); }Copy the code
Can be seen from the above code, SingleThreadEventExecutor keep single-threaded logic is that to tasks in the queue, if the state of the thread pool is not open, open it. So executor’s execute method can only be executed once, on a single thread. So our logic comes into this abstract run method that I took the first screenshot of, so let’s take a closer look at the logic.
@Override
protected void run(a) {# create an infinite loop where the entire EventLoop thread is stuck. Why is that oneloop
for (;;) {
tryEventLoop () {EventLoop () {EventLoop () {EventLoop () {EventLoop ()Select
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
caseSELECT (wakenup.getandSet (wakenup.getandSet (wakenup.getandSet (wakenup.getandSet (wakenup.getandSet ())))false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try{# handle selector generated by selectorselectorKey
processSelectedKeys(a);
} finally {
// Ensure we always run tasks.RunAllTasks (); }}else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100- ioRatio) / ioRatio); }}}catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return; }}}catch(Throwable t) { handleLoopException(t); }}}Copy the code
When out of the select (wakenUp getAndSet (false)); So when we loop, we’re either saying that there’s a taskQueue task, or that the selector has received the data, and we’re doing both of those things in our code. So let’s do the Selector
private void processSelectedKeysOptimized(a) {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
finalObject a = k.attachment(); processSelectedKey(k, (AbstractNioChannel) a); }}Copy the code
Iterate over the key produced by this selector. Call the processSelectedKey method, in which the event is distributed
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
intreadyOps = k.readyOps(); OP_CONNECT event is raised if the client connects to the serverif((readyOps & SelectionKey.OP_CONNECT) ! =0) {
intops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } # write events are not registered. If there is a buffer in the kernel, write events will be responded immediatelyif((readyOps & SelectionKey.OP_WRITE) ! =0) { ch.unsafe().forceFlush(); } # read events and accept client eventsif((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code
Even though OP_WRITE is rarely used, the unsafe.read () and OP_ACCEPT () methods are both used for reading events
What is the process by which BossGroup registers new connections to WorkerGroup
The above analysis stops at the safe.read method, which handles both read events and client connection events, which brings us to our second question, how Netty drops a client connection from the BossGroup into the WorkerGroup. First, analyze the process after the event from the perspective of NIO
- The BossGroup NioEventLoop accepts a Channel, which is the client.
- The BossGroup registers this Channel with the Selector of the EventLoop in the WorkerGroup.
- The EventLoop in the WorkerGroup then proceeds to iterate over its Selector
Because a large number of Channel and Pileline operations are involved in this process, it is necessary to skip this process and look directly at the results to sort out the overall process
Unbroadening. Read Already reads the client’s Channel, in readBufThe read data will come inServerBootStrap
In channelRead, Boos and Worker groups are bound at startupAnd we know that this childGroup is a workergroupThen we call next’s register, and we know we’re picking a register from our array, so the method goes to the register in the SingleThreadEventLoop. And wrapped a PromiseThis step is to register the Channel with the EventLoop, which has a Selector in it, so you need to find the Selector. Register in the NIO methodAbstractChannel. Note that many Netty methods will check the inEventLoop, but don’t really care, just know that it will end up in the queue of the Loop and wait for the Loop to pull the taskNow what’s going on in register0This doRegister lets different channels implement custom registration methods, so let’s go straight to AbstractNioChannelIt’s pretty clear up to this point that the boss is done assigning a channel to the worker’s selector, but there is one more point that the registered listening event is 0. Because there is a bug in the JDK,Netty author’s explanation. If 0 is registered successfully, the listener event will be registered again, which is shown by a breakpoint in AbstractNioChannel’s doBeginRead method
conclusion
So far the whole process is finished, review again
- BossGroup starts EventLoop, which is a single-thread loop that keeps listening for events generated by the selector. In the run method of NioEventLoop
- Process the event in the processSelectedKey method of the NioEventLoop when it is heard. Found the registration event, so go to the ServerBootStrap channelRead method,
This completes the step of the boss passing channel to the worker
- The worker gets the channel and registers it with her own
selector
And then I’m going to go through my selector. Complete event listening