Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.
The problem
- In the JDK’s native NIO writing, pass
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT)
How to register server channel with a multiplexer selector in Netty? What additional things did Netty do during the registration process? - In a previous article in the Netty source code analysis series on server Channel initialization, we looked at adding an anonymous class to pipeline in the init(Channel) method: ChannelInitializer, in the initChannel(channel) method of this anonymous class, performs important logic: two handlers are added to the pipeline. However, the previous article directly described the execution result of initChannel(channel), not how the code calls back to the anonymous class’s initChannel(channel) method, as this article explains in more detail.
Detailed review
When serverbootstrap.bind (port) is called, the code executes to the AbstractBootstrap.dobind (localAddress) method and calls the initAndRegister() method within the doBind() method. The simplified initAndRegister() method looks like this.
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
/** * newChannel() creates a channel by reflection, which eventually calls on the constructor of the channel. The no-parameter constructor for NioServerSocketChannel is called. * /
channel = channelFactory.newChannel();
/** * Register a channel by adding an anonymous class (ChannelInitializer) to the channel. Calls back to the anonymous class's initChannel(channel) method */
init(channel);
} catch (Throwable t) {
// omit some code...
}
ChannelFuture regFuture = config().group().register(channel);
// omit some code...
return regFuture;
}
Copy the code
The initAndRegister() method initializes the server channel, the NioServerSocketChannel, and registers the server channel with the multiplexer. The first half of the initAndRegister() method, the process of server Channel initialization, was examined in the previous Netty source code analysis series on server Channel initialization. On the server channel in the process of initialization through channelFactory. NewChannel () reflection calls to NioServerSocketChannel no arguments constructor, will eventually create a NioServerSocketChannel instance, Many attributes of NioServerSocketChannel are initialized in the constructor, such as Pipeline, unsafe. The init(channel) method is then called, which sets options, attr, and other properties for NioServerSocketChannel. Most importantly, an anonymous class of type ChannelInitinalizer was added to the NioServerSocketChannel pipeline.
When init(channel) is done, the code then executes to ChannelFuture regFuture = config().group().register(channel); . This line of code is the focus of today’s analysis, and its main function is to register a server Channel with the multiplexer.
The register (channel) source
ChannelFuture regFuture = config().group().register(channel);
Copy the code
In this line of code, config() gets the ServerBootstrapConfig object, which holds some of the properties we configured for the Netty server, Properties such as bossGroup, workerGroup, Option, Handler, childHandler, etc. are stored in the ServerBootstrapConfig object. (bossGroup and workerGroup stand for Reactor’s master/slave thread pool, i.e., the NioEventLoopGroup we created using the following code)
// The thread group that handles the connection
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// The thread group that handles the IO and business logic
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
Copy the code
Config ().group() retrieves the bossGroup we set for the server. So config().group().register(channel) actually calls the register(channel) method of NioEventLoopGroup. Because NioEventLoopGroup inherited MultithreadEventLoopGroup, register (channel) defined in MultithreadEventLoopGroup class, So will call MultithreadEventLoopGroup. Register (channel). Netty class inheritance relationship is very complex, so in the process of looking at the source code, it is best to use the DEBUG method of IDEA to see the source code, otherwise sometimes do not know a specific implementation of the method is in which class).
MultithreadEventLoopGroup. Register (channel) method of the source code is as follows.
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
Copy the code
The NioEventLoopGroup is a thread group that contains a set of NioEventLoops. The next() method is to polling a NioEventLoop from the NioEventLoopGroup thread group. Register (Channel) is then executed through NioEventLoop.
When nioEventloop. register(channel) is called, the register(channel) of the SingleThreadEventLoop class is actually called. SingleThreadEventLoop. Register (channel) source as follows.
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
Copy the code
The channel is NioServerSocketChannel, and this is NioEventLoop. We create a ChannelPromise object, and then save the channel and NioEventLoop into this ChannelPromise object to make it easier to retrieve the channel and NioEventLoop from the ChannelPromise later.
Another overloaded register(channelPromise) method in SingleThreadEventLoop is then called. The source code is as follows.
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
/** * For the server * promise is DefaultChannelPromise * promise.channel() gets NioServerSocketChannel * Unbroadening () gets NioMessageUnsafe * Since NioMessageUnsafe inherits AbstractUnsafe, when unbroadening.register() is called, The Register () method */ of the AbstractUnsafe class is called
/ / this is NioEventLoop
promise.channel().unsafe().register(this, promise);
return promise;
}
Copy the code
In register(Final ChannelPromise Promise), promise.channel() gets NioServerSocketChannel. (As mentioned earlier, when creating channel Promise, NioServerSocketChannel is saved in the promise, so it’s available here.
Promise. Channel (). The unsafe () is actually NioServerSocketChannel. Unsafe (), it will get to the NioMessageUnsafe object. When is this object saved to NioServerSocketChannel? The unsafe property is initialized in the constructor of the NioServerSocketChannel parent class when the reflection calls the constructor of the NioServerSocketChannel. For the server channel, The unsafe attribute is the NioMessageUnsafe instance object; For client channels, the unsafe property is the NioSocketChannelUnsafe instance object. (It is important to remember that subsequent access to new connections, and data reading and writing, are based on these two unsafe attributes.)
Since this is a server-side channel, promise.channel().unsafe().register(this, promise) is essentially the register(this, promise) method that calls the NioMessageUnsafe class. NioMessageUnsafe inherits the AbstractUnsafe class. The Register (this, Promise) method is actually defined in the AbstractUnsafe class. The NioMessageUnsafe class doesn’t override this method. So you end up calling Abstractunsafe.register (this, promise). Finally the core code!!
Abstractunsafe.register (this, promise) method truncated source code below.
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// omit some code....
// For server channels, this step is to assign the eventLoop attribute to NioServerSocketChannel
AbstractChannel.this.eventLoop = eventLoop;
Register0 () = register0();
if (eventLoop.inEventLoop()) {
// Synchronize execution
register0(promise);
} else {
try {
// Commit to the NioEventLoop thread and execute asynchronously
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }}); }catch (Throwable t) {
// Omit some code}}}Copy the code
The logic of the above code can be divided into two parts. The first: Through AbstractChannel. This. EventLoop = eventLoop NioServerSocketChannel eventLoop attribute assignment, behind such service side channel is binding on the NioEventLoop, All operations are performed by this thread. Eventloop.ineventloop () is used to determine whether register0() is executed synchronously or asynchronously.
Eventloop.ineventloop () checks whether the current thread is equal to the thread stored in the eventLoop. If so, register0() is executed synchronously. If not, register0() is executed asynchronously. Since the current thread is a main() thread, it must not be equal to the thread in eventLoop, so register0() is executed asynchronously through eventLoop.
So far, I haven’t seen the code binding the server channel to the multiplexer. This shows that the binding operation should be at register0(), so let’s look at the code for register0().
private void register0(ChannelPromise promise) {
try {
// omit some code...
boolean firstRegistration = neverRegistered;
/** * For server channels, all the doRegister() method does is register server channels with the multiplexer */
doRegister();
neverRegistered = false;
registered = true;
// The handlerAdded method is executed
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
Executes the channelRegistered() method of each ChannelHandler through pipeline propagation
pipeline.fireChannelRegistered();
// If the server channel is already active, perform the following logic.
IsActive () returns false and does not enter the if block because the server channel is not yet bound to a port
if (isActive()) {
// Omit some code}}catch (Throwable t) {
// omit some code...}}Copy the code
In the register0() method, there are three important steps of logic: doRegister(); The second: pipeline invokeHandlerAddedIfNeeded (); Third: pipeline. FireChannelRegistered (). Here’s what each of these steps does.
DoRegister (). DoRegister () is the step to actually register a server channel with the multiplexer. AbstractNioChannel doRegister() calls the AbstractNioChannel doRegister() method.
protected void doRegister(a) throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
return;
} catch (CancelledKeyException e) {
// Exception handling......}}}Copy the code
JavaChannel () fetches the native ServerSocketChannel in the JDK.
EventLoop ().unwrappedSelector() takes the native multiplex Selector in the JDK (the underlying data structure has been replaced). (The unwrappedSelector property in EventLoop is initialized when the NioEventLoop is created, and the underlying data structure is replaced at that time.)
So javaChannel().register(eventLoop().unwrappedSelector(), 0, this) This is done by calling the JDK’s native ServerSocketChannel register(selector, OPS, attR) method and registering the server Channel with the multiplexer selector.
Note that when calling the JDK’s native register() method, the third argument is passed this, which represents the current NioServerSocketChannel object. We save this as an attachment to a multiplexer Selector, and the nice thing about this is that we can get a channel from the multiplexer Selector later on. The second argument is passed in as 0, indicating that the server channel is now registered with the multiplexer. The event identifier that the server Chennel is interested in is 0, meaning that it is not interested in any events at this time. The real interest in receiving events comes after the server channel listens to the port.
When doRegister () method is performed, will perform the second step: pipeline. InvokeHandlerAddedIfNeeded (). What this step does is call back to the handlerAdded() method of the Pipeline handler. InvokeHandlerAddedIfNeeded source () method are as follows.
final void invokeHandlerAddedIfNeeded(a) {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// The logic here is executed only during the first registration
// Call back the handlerAdded() method of all handlerscallHandlerAddedForAllHandlers(); }}Copy the code
Only when the channel to register for the first time, will perform callHandlerAddedForAllHandlers () method. The core logic in callHandlerAddedForAllHandlers ().
private void callHandlerAddedForAllHandlers(a) {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert! registered;// The channel itself is registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
/ / pendingHandlerCallbackHead when the value of the attribute is initialized?
PendingHandlerCallback task = pendingHandlerCallbackHead;
while(task ! =null) { task.execute(); task = task.next; }}Copy the code
From callHandlerAddedForAllHandlers source we can find () method, it is the core logic of this while (task! =null) loop, and then execute the task in the loop. The task is the initial value of the enclosing pendingHandlerCallbackHead, namely DefaultChannelPipeline. PendingHandlerCallbackHead. So the question comes, pendingHandlerCallackHead when the value of the attribute is initialized. (Then you will start to get confused, and the code will jump all over. At this time, the debug function of IDEA will be used.)
In the initAndRegister() method, init(Channel) is called, where an anonymous class of type ChannelInitializer is added to the pipeline via pipeline.addLast(), In the “Review” section of this article, I mentioned how important this step is, and now I’ll show you just how important it is.
When the pipelines.addlast () method is called, Finally, DefaultChaannelPipeline’s addLast(EventExecutorGroup group, String name, ChannelHandler handler) method is called. The source code for this method is as follows.
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// The default is false. When false, the channel has not yet been registered with the eventLoop
if(! registered) {// Check that the handlerState property is equal to 0 and set to 1
// pending: pending
newCtx.setAddPending();
// Wrap ChannelHandlerContext as a PendingHandlerCallback (which is essentially a Runnable Task)
// Then add to the callback queue
callHandlerCallbackLater(newCtx, true);
return this;
}
/ / return NioEvenGroup
EventExecutor executor = newCtx.executor();
if(! executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor);return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
Copy the code
In the above code, addLast0(newCtx) actually adds handlers to the pipeline, but after the addition, a method is executed: callHandlerCallbackLater(newCtx, true). Execute handler’s callback method at a later callback. Let’s look at the logic of the callHandlerCallbackLater(newCtx, true) method.
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert! registered;/ / added to true
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while(pending.next ! =null) { pending = pending.next; } pending.next = task; }}Copy the code
What is the argument CTX? It is the front we create ChannelInitializer this anonymous classes encapsulate into AbstractChannelHandlerContext object; The added parameter is passed true. So created task is PendingHandlerAddedTask (CTX), then we can find that will create the task assigned to finally pendingHandlerCallbackHead properties.
Return to the previous callHandlerAddedForAllHandlers () method, we know the pendingHandlerCallbackHead attribute’s value is PendingHandlerAddedTask (CTX), So executing task.execute() is executing execute() of the PendingHandlerAddedTask object. In the execute() method, the callHandlerAdded0(CTX) method is called, followed by ctx.CallHandlerAdded (), Object of CTX is ChannelInitializer this anonymous classes encapsulate AbstractChannelHandlerContext object. Keep tracking the source code of the ctx.CallHandlerAdded () method and discover that it eventually calls the handlerAdded() method of the handler object. At this point, I finally figured out where the handlerAdded() method was called back. This pipeline. InvokeHandlerAddedIfNeeded () method is performed in the end.
Back to register0 () method, when the pipeline. The invokeHandlerAddedIfNeeded () method completes, then to perform the code is executed to pipeline. The fireChannelRegistered (), This is the third step we mentioned earlier. What this step does is propagate the Channel registration event. How? Start with the header handler in the pipeline and execute each handler’s channelRegistered() method in turn.
Through follow up step by step, you can see pipeline. FireChannelRegistered () will call to AbstractChannelContextHandler object invokeChannelRegistered (). The source code for this method is below.
private void invokeChannelRegistered(a) {
if (invokeHandler()) {
try {
// handler() gets the handler object
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch(Throwable t) { notifyHandlerException(t); }}else{ fireChannelRegistered(); }}Copy the code
The handler () to obtain is packing in current AbstractChannelContextHandler object handler object, for example, the previously created ChannelInitializer anonymous class. The channelRegistered(this) method of the Handler object is then called, so the channelRegistered(this) method of the anonymous class of ChannelInitializer is eventually called. Let’s take a look at what the channelRegistered(this) method of ChannelInitializer’s anonymous class does.
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
// Look at the logic of initChannel(CTX)
if (initChannel(ctx)) {
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.ctx.fireChannelRegistered(); }}Copy the code
As you can see, the first call initChannel (CTX) method, and then call the CTX. Pipeline () fireChannelRegistered () or CTX. FireChannelRegistered (), behind the two methods through a method name can be seen, This is the channelRegistered() method that continues to be propagated to the pipeline executing handler, and you can forget about it. Focus here on the initChannel(CTX) method. Note: The initChannel(CTX) method is of type ChannelHandlerContext, and the initChannel(channel) method is of type channel, so don’t confuse the two overloaded methods.
The source code for the initChannel(CTX) method is shown below.
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//ChannelInitializer The anonymous class's initChannel(channel) method will be called here
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...) .
// We do so to prevent multiple calls to initChannel(...) .
exceptionCaught(ctx, cause);
} finally {
// Delete this node
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this); }}return true;
}
return false;
}
Copy the code
You can see that in the initChannel(CTX) method, the initChannel(channel) method is called first. Because the initChannel(channel) method was overridden in the previously created ChannelInitializer anonymous class, the overridden initChannel(channel) method is called. The following screenshot shows the code for creating the ChannelInitializer anonymous class instance.
In the initChannel(channel) method overridden, we add the handler we set up for the server in main() to the pipeline, and then asynchronously through ch.eventloop ().execute(). A ServerBootstrapAcceptor handler was added to the pipeline. The ServerBootstrapAcceptor handler is the client access handler.
When to call intiChannel(channel), now we finally know.
However, it’s not over yet. Going back to the initChannel(CTX) method, we see that an important operation was done in the finally block to remove the handler represented by the ChannelInitializer anonymous class from the pipeline. This is because the purpose of this anonymous class is to initialize the pipeline structure for the server channel during initialization and registration. Now the server channel initialization and registration are complete, and the server channel is initialized only once when the service is started. Therefore, the anonymous class has no meaning behind it and is removed from the pipeline, so the pipeline structure in the final server channel is shown as follows.
At this point, the initAndRegister() method is finally parsed.
conclusion
- InitAndRegister () : ServerSocketChannel register () : ServerSocketChannel register () : ServerSocketChannel register (); Registers a server channel with a multiplexer Selector.
- Then this article also through the code step by step follow-up, detailed description of the ChannelInitializer anonymous class instance initChannel(channel) callback process, and finally formed the structure of pipeline in channel server.
- At this point, Netty server channel initialization and registration has been completed, but the server startup process is not finished, there is still the last step: server channel and port number binding, port binding process, the next article analysis.
recommended
- How to evolve from BIO to NIO to Netty
- Netty source Code Analysis series Reactor thread model
- Netty source code analysis series server Channel initialization