Scan the qr code below or search the wechat official account, cainiao Feiyafei, you can follow the wechat official account, read more Spring source code analysis and Java concurrent programming articles.
Problem 1.
When the Netty server is started, it can start receiving connections from clients. How does a server create a new connection in Netty? Before you start reading the source code, consider the following three questions.
- How does the server detect new client access requests (new connection access)?
- In JDK native NIO, the server will pass ServerSocketChannel. The accept () for the new access to the client to create the corresponding client channel, so in the netty server and how to handle the new connection access?
- In Netty, the NioEventLoop thread is used to read and write network IO. How does the client channel bind to the NioEventLoop thread in the worker thread pool?
2. Check for new connections
NioEventLoop (NioEventLoop) : NioEventLoop (NioEventLoop) : NioEventLoop (NioEventLoop) : NioEventLoop (NioEventLoop) When network IO events are processed, when the IO event type is OP_ACCEPT (as shown in the following code), a new client is connected to the server, that is, a new connection is detected. At this point, the server channel reads the new connection.
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
unsafe.read();
}
Copy the code
As you can see, the unsafe.read() method is called when an OP_ACCEPT event is required to access a new connection. In this case, the unsafe object is an instance of the NioMessageUnsafe type. Why? Because the OP_ACCEPT event is of interest only to the server channel, the unsafe property in the server channel stores instances of the NioMessageUnsafe type.
The source code for the read() method is long, but it does two things. First, it calls the doReadMessages() method to read connections. Second: the read connection is propagated through the pipeline in the server channel, finally executing the channelRead() method in each handler.
3. Create a client channel
After listening to the OP_ACCEPT event, the server channel creates a client channel for the new connection. Subsequent data is read and written through this client channel. This client channel is created through the doReadMessages() method, which is defined in NioServerSocketChannel, and the source code is shown below.
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if(ch ! =null) {
// wrap the native client channel as the client channel in netty: NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1; }}catch (Throwable t) {
// Exception log printing...
}
return 0;
}
Copy the code
In this method, we first get the JDK’s native server channel, ServerSocketChannel, via javaChannel(). This native server channel is stored in the NioServerSocketChannel’s CH attribute, which is assigned when NioServerSocketChannel is initialized (see this article). Netty source code analysis series server Channel initialization. Once a JDK native server channel is created, a JDK native client channel is created using the SocketUtils utility class. SocketUtils this tool the underlying implementation of a class, is actually calling JDK native API, namely the ServerSocketChannel. The accept ().
After creating a native SocketChannel, netty needs to wrap it into the server channel type defined in netty, namely, NioSocketChannel. How do you pack it? This is wrapped using the NioSocketChannel constructor via the new keyword call. In the constructor, a lot of initialization is done. Tracing the source, we find the following constructor called to the AbstractNioChannel class.
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
Parent = NioServerSocketChannel, ch = SocketChannel(JDK native client channel), readInterestOp = OP_READ
super(parent);
// Save channels and interested events
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// Set it to non-blocking
ch.configureBlocking(false);
} catch (IOException e) {
// Exception handling...}}Copy the code
In this constructor, we first save the native client channel and the events of interest to the client channel, and then set the blocking mode of the client channel to false, indicating that it does not block. (In NIO network programming, this step is required, otherwise starting will report an error.) It also calls the parent class constructor, which is AbstractChannel. The constructor of the AbstractChannel class is shown below.
protected AbstractChannel(Channel parent) {
// Parent has the value NioServerSocketChannel
this.parent = parent;
id = newId();
// For the client channel, the unsafe is NioSocketChannelUnsafe
unsafe = newUnsafe();
// DefaultChannelPipeline
pipeline = newChannelPipeline();
}
Copy the code
In this constructor, for client channels, the parent value is NioServerSocketChannel, which is the server channel created when the netty server starts. Unsafe is then created, NioSocketChannelUnsafe, and a default pipeline is created for the client channel, which has the following structure. (If you have read the previous articles, you may have noticed that this constructor is also called when a server-side channel is created.)
Finally, a NioSocketChannelConfig object is created for NioSocketChannel. This object is used to store some TCP configurations and properties set by the user for the client channel. When creating this config object, Sets the TCP_NODELAY parameter of TCP to true. By default, TCP accumulates small packets into large packets before sending them. Netty sends small packets in a timely manner. Therefore, TCP_NODELAY is set to true, indicating that the packets are not sent late.
At this point, the client channel corresponding to the new connection is created. Subsequent network data reading and writing are based on this NioSocketChannel.
4. Binding NioEventLoop
When a client channel is created, the read() method propagates the client channel through pipeline through a line of code called pipeline.FireChannelRead (socketChannel). Execute the channelRead() method of each handler in the pipeline in turn. (Note that the pipeline is stored in the server channel. When creating a client channel, a pipeline is also created for each new client channel. Don’t confuse this.)
When the server is started, the structure diagram of pipeline in the server channel is as follows (refer to these three articles for detailed explanation: Server Channel initialization for Netty source analysis series, server Channel registration for Netty source analysis series, and port binding for Netty server Channel.
The channelRead() method does nothing for head and tail, and propagates directly to the next node. What is important here is the channelRead() method of the ServerBootstrapAcceptor node. The source code for this method is below.
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// Add a user-defined childHandler to the client channel
child.pipeline().addLast(childHandler);
// Save the attributes configured by the user for the client channel
setChannelOptions(child, childOptions, logger);
for(Entry<AttributeKey<? >, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }try {
// Register the client channel to the worker thread pool by selecting a NioEventloop from the workerGroup and binding the client channel to the NioEventloop
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}Copy the code
In this method, first add a childHandler to the node in the client channel pipeline. This childHandler is defined by the user. As you can see in the figure below, the user defines a ChannelInitializer childHandler using the childHandler() method. This now adds the childHandler to the node in the client channel’s Pipeline (this is important, but will be used later). Then use setChannelOptions to save the TCP parameters and properties configured by the user for the client channel.
The most important step is in childGroup.register(Child), which registers the client channel with one of the NioEventloops in the workerGroup thread pool. (In the process of server port binding, it is similar to calling the register() method of NioEventLoopGroup to register the server channel to a NioEventLoop in the bossGroup).
The childGroup is the workerGroup (the Reactor master slave thread pool in the Reactor model). The register() method is called as follows.
public ChannelFuture register(Channel channel) {
// The next() method selects a NioEventLoop from the NioEventLoop
return next().register(channel);
}
Copy the code
The next() method selects a NioEventLoop from the NioEventLoop (see next() for details: NioEventLoop: SingleThreadEventLoop: NioEventLoop: SingleThreadEventLoop So the last call here is the following register() method in SingleThreadEventLoop.
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
/** * For client channels * promise is DefaultChannelPromise * promise.channel() gets NioSocketChannel * Promise.channel ().unsafe() gets NioSocketChannelUnsafe * Since NioSocketChannelUnsafe inherits AbstractUnsafe, So when you call safe.register(), the Register () method */ of the AbstractUnsafe class is called
/ / this is NioEventLoop
promise.channel().unsafe().register(this, promise);
return promise;
}
Copy the code
Unsafe () gets the NioSocketChannelUnsafe object. Since NioSocketChannelUnsafe inherits AbstractUnsafe, when unsafe. Register () is called, The Register () method of the AbstractUnsafe class is called. The simplified source code for this method is shown below.
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// omit some code....
// For client channels, this step is to assign the eventLoop attribute to NioSocketChannel
AbstractChannel.this.eventLoop = eventLoop;
Register0 () = register0();
if (eventLoop.inEventLoop()) {
// Synchronize execution
register0(promise);
} else {
try {
// Commit to the NioEventLoop thread and execute asynchronously
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }}); }catch (Throwable t) {
// Omit some code}}}Copy the code
In fact, this method is called when a server-side channel registers with a NioEventLoop (see this article on server-side channel registration in the Netty source code analysis series).
For the client, in this method, the client channel is bound to a NioEventLoop with the following line of code, which answers the third question at the beginning of this article.
AbstractChannel.this.eventLoop = eventLoop;
Copy the code
It then determines whether the current thread is equal to the thread saved in the eventLoop passed in, which it certainly is not. Why is that? Because the current thread is a thread in the bossGroup thread group and eventLoop is a thread in the workerGroup thread group, false is returned and the register0() method is executed asynchronously. The source code for the register0() method is shown below.
private void register0(ChannelPromise promise) {
try {
// omit some code...
boolean firstRegistration = neverRegistered;
/** * For client channels, all the doRegister() method does is register the server channel with the multiplexer */
doRegister();
neverRegistered = false;
registered = true;
// The handlerAdded method is executed
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
Executes the channelRegistered() method of each ChannelHandler through pipeline propagation
pipeline.fireChannelRegistered();
// If the client channel is already active, perform the following logic.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if(config().isAutoRead()) { beginRead(); }}}catch (Throwable t) {
// omit some code...}}Copy the code
In the register0() method, there are three important steps of logic: doRegister(); The second: pipeline invokeHandlerAddedIfNeeded (); Third: pipeline. FireChannelRegistered (). Here’s what each of these steps does.
DoRegister () is a step towards actually registering a client channel with the multiplexer. AbstractNioChannel doRegister() calls the AbstractNioChannel doRegister() method.
protected void doRegister(a) throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
return;
} catch (CancelledKeyException e) {
// Exception handling......}}}Copy the code
JavaChannel () gets the native SocketChannel in the JDK.
EventLoop ().unwrappedSelector() takes the native multiplex Selector in the JDK (the underlying data structure has been replaced). (The unwrappedSelector property in EventLoop is initialized when the NioEventLoop is created, and the underlying data structure is replaced at that time.)
So javaChannel().register(eventLoop().unwrappedSelector(), 0, this) This is done by calling the JDK’s native SocketChannel register(selector, OPS, attR) method and registering the client Channel with a multiplexer selector.
Note that when calling the JDK’s native register() method, the third argument is passed this, which represents the current NioSocketChannel object. You save this as an attachment to a multiplexer Selector, and the nice thing about that is that you can then get the channel of the client through the multiplexer Selector. The second argument is passed in as 0, indicating that the client channel is now registered with the multiplexer, and that the event identifier of interest to the client Chennel is 0, meaning that it is not interested in any events at this point (the events of interest will be set to OP_READ later).
When doRegister () method is performed, will perform the second step: pipeline. InvokeHandlerAddedIfNeeded (), it do is a callback handler in the pipeline handlerAdded () method.
To perform the code is executed to pipeline. The fireChannelRegistered (), also is the third step we mentioned above. What this step does is propagate the Channel registration event. How? Start with the header handler in the pipeline and execute each handler’s channelRegistered() method in turn.
As mentioned earlier, an anonymous class of ChannelInitializer type is added to the client channel pipeline, so when propagating the channelRegistered() method, The anonymous class’s channelRegistered() method is executed, which ultimately executes the initChannel(channel) method overridden in the anonymous class, as shown in the following figure. For details on how to call the initChannel(channel) method, see this article: Server channel registration for Netty source Code Analysis series. However, the best way to read the source code is to do it yourself, Debug you may experience a deeper, easier to understand.
Returning to the register0() method, isActive() is finally checked to see if it is true, and since the client channel is already registered with the multiplexer, it returns true, and since the client channel is registered for the first time, So will pipeline. FireChannelActive () this one line of code, namely will spread through the client channel pipeline down to perform all the handler channelActive () method, The final call to AbstractChannel’s doBeginRead() method is complex. DEBUG is recommended. The source code for the doBeginRead method is as follows.
protected void doBeginRead(a) throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if(! selectionKey.isValid()) {return;
}
readPending = true;
/** * When a client channel is registered with the multiplexer, InterestOps = 0 * selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); * /
final int interestOps = selectionKey.interestOps();
/** * The value of the readInterestOp property, which is set to selectionkey.op_read */ in the NioSocketChannel constructor
if ((interestOps & readInterestOp) == 0) {
/ / for the client channel, interestOps | for OP_READ readInterestOp operation results
// Finally, the selectionKey is interested in the OP_READ event. At this point, the client channel can finally start receiving the client link.selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code
At this point, the client channel is interested in the OP_READ event, which is then ready to read and write data.
5. To summarize
This article focuses on how when a new connection comes in, the Netty server creates a client channel for the new connection and binds it to the NioEventLoop thread. The client channel registration process is very similar to the server channel registration process, the call process is almost the same, so it is recommended to read this article Netty source code analysis series server channel registration.
recommended
- How to evolve from BIO to NIO to Netty
- Netty source Code Analysis series Reactor thread model
- Netty source code analysis series server Channel initialization
- Netty source code analysis series server Channel registration
- Netty source code analysis series server Channel port binding
- Netty source Code Analysis series NioEventLoop implementation process