An overview of the
Note: NIO2(AIO) is asynchronous IO
NIO2 profile
AIO and NIO2 are actually the same thing, just as Sun Walker and Sun Walker are actually Sun monkey, but the names are different and the essence is the same
So how do you understand this concept? For example
Let’s say a girl has a lot of sparetires. What is the easiest and most efficient way for a girl to get something done?
The answer is, lick the dog so much, leave it to them. So while the dog is doing his job, does the girl wait for the dog to get it done? No, of course you can continue to assign other tasks to other dogs in the meantime. When the dog during the affairs, if there is a need for sister to deal with things, notice processing can be.
Of course, dogs tend to do the heavy lifting, such as copying data, I/O, accepting new connections and so on. Girls focus on the core business.
In this case, the sister is the core business thread, which is used to process the business logic, while the dogs are the abstraction (kernel +I/O thread).
P.S.
-
If you know about NIO2, it is recommended that you read the NIO2 model interpretation chapter directly instead of reading the NIO2 DEMO chapter (time is precious).
-
You can skip through all the chapters to see the summary, or you can simply read the appendix and start debugging the code directly
NIO2 DEMO
There is a core point in NIO2 where the kernel is responsible for notifying the application of what is happening, and the application still needs to provide threads for receiving connections and copying data
talk is cheap, show me your hair
If you want to learn about NIO2, you can click Learn
The source code comments for GBK code, if you see comments for garbled, it is best to change it to GBK code
This is a Demo, and it’s worth noting that although this example does not explicitly create a thread pool, this is because if you are on the Open () server and you do not specify it, the system will by default allocate a thread pool to ServerSocketChannel for event handling. We can verify this by opening JConsole.
channel = AsynchronousServerSocketChannel.open();
Copy the code
Thread-0 through Thread-4 are the default thread pools allocated by the system to handle I/O events. (Godsend lick dog)
Imagine that if we blocked all threads while processing I/O events, the entire system’s I/O would be blocked, as shown in the figure below.
So threads that handle I/O events are better off handling only I/O events (receiving new connections, copying data from the kernel into threads)
As you can see, it’s best for the dog lick to do what the dog lick is supposed to do, which is heavy work, and the core business or events that block are best handed over to the girl (the business logic processing thread pool).
Tomcat NIO2 model
The key class org.apache.tomcat.util.net.Nio2Endpoint
Since we are addressing the NIO2 processing model, it is important to understand the following key players
-
A Nio2Acceptor is not bound to a particular thread. Instead, an Acceptor selects a thread from the thread pool to execute an Acceptor’s code when a new connection arrives
-
LimitLatch Limits the number of connections. The main way to limit the number of connections in asynchronous I/O cases is by locking the threads in the thread pool used for I/O events
-
The I/O processor is a class that handles I/O and runs in the same thread pool as niO2Acceptors
ServerSocket startup
The asynchronous ServerSocket startup process is a bit boring, but if you don’t want to look at the code, here’s how it starts
- Create a thread pool, wrapped as
AsynchronousChannelGroup
- Opens ServerSocket
- Bind ports and set the maximum number of connections
@Override
public void bind(a) throws Exception {
// Create a thread pool
if (getExecutor() == null) {
createExecutor();
}
if (getExecutor() instanceof ExecutorService) {
/ / create used in I/O thread pool (need to use AsynchronousChannelGroup packaging, to provide for AsynchronousServerSocketChanne)
threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
}
// AsynchronousChannelGroup needs exclusive access to its executor service
if(! internalExecutor) { log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
}
/ / create a ServerSocketChannel
serverSock = AsynchronousServerSocketChannel.open(threadGroup);
socketProperties.setProperties(serverSock);
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
// Bind ports and set backlog parameters
// Backlog can be interpreted as the current maximum number of connections to be accepted
serverSock.bind(addr, getAcceptCount());
// Initialize SSL if needed
initialiseSsl();
}
Copy the code
This is the thread pool bound to the current asynchronous ServerSocketChannel,801 being the port on which the connector listens (see appendix for tutorials on enabling NIO2)
Nio2Acceptor
The main function of NIO2Acceptors is to accept new connections and limit the maximum number of connections. Because asynchronous I/O is used, acceptors are not bound to a specific thread. Instead, they select a task from the thread pool to execute when a new task is required. When a new connection arrives from a client, the program selects a thread from the thread pool to execute the Nio2Acceptor completed method and passes it to the client Socket to start executing the business logic for the new connection
AcceptHandler registered
In asynchronous I/O we need to register the Accept handler with ServerSocketChannel to handle connection events. As shown in the following code, when Tomcat is started, a thread is opened that calls Nio2SocketAcceptor’s run method. Register Nio2SocketAcceptor as the accept event handler for ServerSocketChannel
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
implements CompletionHandler<AsynchronousSocketChannel.Void> {...@Override
public void run(a) {
// The initial accept will be called in a separate utility thread
if(! isPaused()) {If the maximum number of connections is reached, the thread calling this method will be stuck waiting
try {
countUpOrAwaitConnection();
} catch (InterruptedException e) {
// Ignore
}
if(! isPaused()) {// Register yourself as a handler for the Accept event (note the interface of this implementation)
serverSock.accept(null.this);
} else{ state = AcceptorState.PAUSED; }}else{ state = AcceptorState.PAUSED; }}... }Copy the code
Handling of new connections
When a new connection arrives, the layer selects a thread from the thread pool to execute the Completed method and pass it to the client socket. The main flow of this method is as follows
- Check if the container is still running if it is still running continue the process
- Check whether the number of connections needs to be limited, and if so, select a thread from the thread pool to execute Acceptor’s run method (which may block)
- If the previous operations are complete, call setSocketOptions to process subsequent I/O events, and then the new connection is received
@Override
public void completed(AsynchronousSocketChannel socket, Void attachment) {
// Successful accept, reset the error delay
errorDelay = 0;
// Continue processing the socket on the current thread
// Configure the socket
if(isRunning() && ! isPaused()) {// Checks the maximum number of connections allowed. If not set (i.e. -1), the number of connections allowed is not allowed
if (getMaxConnections() == -1) {
serverSock.accept(null.this);
} else {
// Select a thread from the thread pool to increase the number of connections due to the arrival of new connections. This operation may block
getExecutor().execute(this);
}
// Perform subsequent I/O event processing
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
if(isRunning()) { state = AcceptorState.PAUSED; } destroySocket(socket); }}Copy the code
Limit the implementation of the maximum number of connections
Because acceptors are not bound to a particular thread, if you want to limit the maximum number of connections, you need to block idle threads using locks. This is why you need to submit a new connection count to the thread pool when accepting a new connection. As shown below (that is, calling the run method of Nio2SocketAcceptor)
public void completed(AsynchronousSocketChannel socket, Void attachment) {... getExecutor().execute(this); . }Copy the code
In addition, remember when we created the ServerSocketChannel we set the backlog parameter?
This parameter is used to set the maximum number of unaccepted connections allowed by the current ServerSocket. This means that if the backlog for unaccepted connections exceeds the value set by the ServerSocket, all new connections will be discarded. (API)
I/O event processing
Since it is asynchronous I/O, the client Socket must register the CompletionHandler for reading and writing, so setSocketOptions must cause this step to occur. When does this step take place?
The Debug trace shows that setSocketOptions will result in the creation of Nio2SocketWrapper. The actual I/O process takes place in the readCompletionHandler created when the Nio2SocketWrapper object is created. Here is the code
The ReadCompletionHandler listens for read events and calls the processSocket method to begin parsing the data once it has been read
public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
super(channel, endpoint);
nioChannels = endpoint.getNioChannels();
socketBufferHandler = channel.getBufHandler();
this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer nBytes, ByteBuffer attachment) {
if (log.isDebugEnabled()) {
log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
}
readNotify = false;
Other threads may modify the flag bit
synchronized (readCompletionHandler) {
//nBytes Indicates the number of bytes read, if less than 0
// Throw an EOF exception, no data is read, so what do you do
if (nBytes.intValue() < 0) {
failed(new EOFException(), attachment);
} else {
if(readInterest && ! Nio2Endpoint.isInline()) { readNotify =true;
} else {
// Release here since there will be no
// notify/dispatch to do the release.
readPending.release();
}
readInterest = false; }}if (readNotify) {
// Handle read events
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false); }}// Omit the code, the back is too long.Copy the code
Debug to verify, as shown in the figure below, attachment is the data we read
Note that IDEA may issue a request to switch threads during debugging (read data and previous operations are not on the same thread, as shown below)
conclusion
Tomcat NIO2 model
-
Accept events and I/O events share a thread pool and are not tied to a specific thread
-
Acceptor(Nio2Acceptor) is used to accept new connections and register handlers for I/O events
-
LimitLatch implements connection number limits by blocking threads in the thread pool
-
I/O handlers are read and write processors registered with Nio2SocketWrapper. When an I/O event arrives, the program selects a thread to execute the code for these processors
-
The overall process is as follows: a new connection arrives -> select a thread to execute Nio2Acceptor’s code -> submit a task to the thread pool to increase the number of connections -> register a read/write event ->I/O event arrives and select a thread to handle the I/O event
Thought the migration
Do not use the default thread pool When an asynchronous ServerSocketChannel is created, Tomcat will create a thread pool instead of using the default provided thread pool, since the thread pool is in our control, thus implementing the connection number limit function
Do not block THE I/O thread the I/O thread must have a subchild of the I/O thread. Do not perform operations on the I/O thread that would block for a long time
Appendix How to Debug Tomcat
As most backend programmers know, Tomcat is embedded in SpringBoot (and jetty, depending on how you choose), so we can create a New SpringBoot application that is dedicated to learning tomcat’s source code.
The following is the process of Tomcat debugging
- Step 1: Open IDEA
- The second step is to create a SpringBoot project
- Third, find the Tomcat JAR package in the project sidebar with Ctrl+F
- Step 4, /mute all, put on your headphones and press a break point
If you want to test the NIO handling of Tomcat, use the following class interruption points (see my understanding if you want to understand how NIO handles Tomcat)
package org.apache.tomcat.util.net;
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel.SocketChannel> {...public class Poller implements Runnable {
public void run(a) {
// The code for this method is on line 692}}... }Copy the code
If you want to test Tomcat’s NIO2 handling, you need the following configuration to add the following code to your code. (Since the default I/O mode of Tomcat embedded in SpringBoot is NIO, we need to configure the connector of NIO2)
import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConnectorConf {
// Pay attention to your SpringBoot version, this project version is 2.2.0, older version 1.5 uses a different class configuration
@Bean
public TomcatServletWebServerFactory servletContainer(a) {
TomcatServletWebServerFactory tomcatServletWebServerFactory =
new TomcatServletWebServerFactory();
tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector());
return tomcatServletWebServerFactory;
}
private Connector getConnector(a) {
// Key points
Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol");
// Set the connector port to 801, so that access to 801 is NIO2 mode
connector.setPort(801);
returnconnector; }}Copy the code
To org.apache.tomcat.util.net.Nio2Endpoint hit a breakpoint is finished
How to debug multithreading
In the case of multi-threading, the breakpoint may not be accessed. In this case, just right-click on the breakpoint and select Thread. IDEA sends a notification when other threads reach the breakpoint, as shown in the following figure