An overview of the

Note: NIO2(AIO) is asynchronous IO

NIO2 profile

AIO and NIO2 are actually the same thing, just as Sun Walker and Sun Walker are actually Sun monkey, but the names are different and the essence is the same

So how do you understand this concept? For example

Let’s say a girl has a lot of sparetires. What is the easiest and most efficient way for a girl to get something done?

The answer is, lick the dog so much, leave it to them. So while the dog is doing his job, does the girl wait for the dog to get it done? No, of course you can continue to assign other tasks to other dogs in the meantime. When the dog during the affairs, if there is a need for sister to deal with things, notice processing can be.

Of course, dogs tend to do the heavy lifting, such as copying data, I/O, accepting new connections and so on. Girls focus on the core business.

In this case, the sister is the core business thread, which is used to process the business logic, while the dogs are the abstraction (kernel +I/O thread).

P.S.

  • If you know about NIO2, it is recommended that you read the NIO2 model interpretation chapter directly instead of reading the NIO2 DEMO chapter (time is precious).

  • You can skip through all the chapters to see the summary, or you can simply read the appendix and start debugging the code directly

NIO2 DEMO

There is a core point in NIO2 where the kernel is responsible for notifying the application of what is happening, and the application still needs to provide threads for receiving connections and copying data

talk is cheap, show me your hair

If you want to learn about NIO2, you can click Learn

The source code comments for GBK code, if you see comments for garbled, it is best to change it to GBK code

This is a Demo, and it’s worth noting that although this example does not explicitly create a thread pool, this is because if you are on the Open () server and you do not specify it, the system will by default allocate a thread pool to ServerSocketChannel for event handling. We can verify this by opening JConsole.

channel = AsynchronousServerSocketChannel.open();
Copy the code

Thread-0 through Thread-4 are the default thread pools allocated by the system to handle I/O events. (Godsend lick dog)

Imagine that if we blocked all threads while processing I/O events, the entire system’s I/O would be blocked, as shown in the figure below.

So threads that handle I/O events are better off handling only I/O events (receiving new connections, copying data from the kernel into threads)

As you can see, it’s best for the dog lick to do what the dog lick is supposed to do, which is heavy work, and the core business or events that block are best handed over to the girl (the business logic processing thread pool).

Tomcat NIO2 model

The key class org.apache.tomcat.util.net.Nio2Endpoint

Since we are addressing the NIO2 processing model, it is important to understand the following key players

  • A Nio2Acceptor is not bound to a particular thread. Instead, an Acceptor selects a thread from the thread pool to execute an Acceptor’s code when a new connection arrives

  • LimitLatch Limits the number of connections. The main way to limit the number of connections in asynchronous I/O cases is by locking the threads in the thread pool used for I/O events

  • The I/O processor is a class that handles I/O and runs in the same thread pool as niO2Acceptors

ServerSocket startup

The asynchronous ServerSocket startup process is a bit boring, but if you don’t want to look at the code, here’s how it starts

  • Create a thread pool, wrapped asAsynchronousChannelGroup
  • Opens ServerSocket
  • Bind ports and set the maximum number of connections
    @Override
    public void bind(a) throws Exception {

        // Create a thread pool
        if (getExecutor() == null) {
            createExecutor();
        }
        if (getExecutor() instanceof ExecutorService) {
            / / create used in I/O thread pool (need to use AsynchronousChannelGroup packaging, to provide for AsynchronousServerSocketChanne)
            threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
        }
        // AsynchronousChannelGroup needs exclusive access to its executor service
        if(! internalExecutor) { log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
        }
        / / create a ServerSocketChannel
        serverSock = AsynchronousServerSocketChannel.open(threadGroup);
        socketProperties.setProperties(serverSock);
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        // Bind ports and set backlog parameters
        // Backlog can be interpreted as the current maximum number of connections to be accepted
        serverSock.bind(addr, getAcceptCount());

        // Initialize SSL if needed
        initialiseSsl();
    }
Copy the code

This is the thread pool bound to the current asynchronous ServerSocketChannel,801 being the port on which the connector listens (see appendix for tutorials on enabling NIO2)

Nio2Acceptor

The main function of NIO2Acceptors is to accept new connections and limit the maximum number of connections. Because asynchronous I/O is used, acceptors are not bound to a specific thread. Instead, they select a task from the thread pool to execute when a new task is required. When a new connection arrives from a client, the program selects a thread from the thread pool to execute the Nio2Acceptor completed method and passes it to the client Socket to start executing the business logic for the new connection

AcceptHandler registered

In asynchronous I/O we need to register the Accept handler with ServerSocketChannel to handle connection events. As shown in the following code, when Tomcat is started, a thread is opened that calls Nio2SocketAcceptor’s run method. Register Nio2SocketAcceptor as the accept event handler for ServerSocketChannel

 protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
        implements CompletionHandler<AsynchronousSocketChannel.Void> {...@Override
        public void run(a) {
            // The initial accept will be called in a separate utility thread
            if(! isPaused()) {If the maximum number of connections is reached, the thread calling this method will be stuck waiting
                try {
                    countUpOrAwaitConnection();
                } catch (InterruptedException e) {
                    // Ignore
                }
                if(! isPaused()) {// Register yourself as a handler for the Accept event (note the interface of this implementation)
                    serverSock.accept(null.this);
                } else{ state = AcceptorState.PAUSED; }}else{ state = AcceptorState.PAUSED; }}... }Copy the code

Handling of new connections

When a new connection arrives, the layer selects a thread from the thread pool to execute the Completed method and pass it to the client socket. The main flow of this method is as follows

  • Check if the container is still running if it is still running continue the process
  • Check whether the number of connections needs to be limited, and if so, select a thread from the thread pool to execute Acceptor’s run method (which may block)
  • If the previous operations are complete, call setSocketOptions to process subsequent I/O events, and then the new connection is received
        @Override
        public void completed(AsynchronousSocketChannel socket, Void attachment) {
            // Successful accept, reset the error delay
            errorDelay = 0;
            // Continue processing the socket on the current thread
            // Configure the socket
            if(isRunning() && ! isPaused()) {// Checks the maximum number of connections allowed. If not set (i.e. -1), the number of connections allowed is not allowed
                if (getMaxConnections() == -1) {
                    serverSock.accept(null.this);
                } else {
                   // Select a thread from the thread pool to increase the number of connections due to the arrival of new connections. This operation may block
                    getExecutor().execute(this);
                }
                // Perform subsequent I/O event processing
                if (!setSocketOptions(socket)) {
                    closeSocket(socket);
                }
            } else {
                if(isRunning()) { state = AcceptorState.PAUSED; } destroySocket(socket); }}Copy the code

Limit the implementation of the maximum number of connections

Because acceptors are not bound to a particular thread, if you want to limit the maximum number of connections, you need to block idle threads using locks. This is why you need to submit a new connection count to the thread pool when accepting a new connection. As shown below (that is, calling the run method of Nio2SocketAcceptor)

 public void completed(AsynchronousSocketChannel socket, Void attachment) {... getExecutor().execute(this); . }Copy the code

In addition, remember when we created the ServerSocketChannel we set the backlog parameter?

This parameter is used to set the maximum number of unaccepted connections allowed by the current ServerSocket. This means that if the backlog for unaccepted connections exceeds the value set by the ServerSocket, all new connections will be discarded. (API)

I/O event processing

Since it is asynchronous I/O, the client Socket must register the CompletionHandler for reading and writing, so setSocketOptions must cause this step to occur. When does this step take place?

The Debug trace shows that setSocketOptions will result in the creation of Nio2SocketWrapper. The actual I/O process takes place in the readCompletionHandler created when the Nio2SocketWrapper object is created. Here is the code

The ReadCompletionHandler listens for read events and calls the processSocket method to begin parsing the data once it has been read

        public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
            super(channel, endpoint);
            nioChannels = endpoint.getNioChannels();
            socketBufferHandler = channel.getBufHandler();

            this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer nBytes, ByteBuffer attachment) {
                    if (log.isDebugEnabled()) {
                        log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
                    }
                    readNotify = false;
                    Other threads may modify the flag bit
                    synchronized (readCompletionHandler) {
                        //nBytes Indicates the number of bytes read, if less than 0
                        // Throw an EOF exception, no data is read, so what do you do
                        if (nBytes.intValue() < 0) {
                            failed(new EOFException(), attachment);
                        } else {
                            if(readInterest && ! Nio2Endpoint.isInline()) { readNotify =true;
                            } else {
                                // Release here since there will be no
                                // notify/dispatch to do the release.
                                readPending.release();
                            }
                            readInterest = false; }}if (readNotify) {
                        // Handle read events
                        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false); }}// Omit the code, the back is too long.Copy the code

Debug to verify, as shown in the figure below, attachment is the data we read

Note that IDEA may issue a request to switch threads during debugging (read data and previous operations are not on the same thread, as shown below)

conclusion

Tomcat NIO2 model

  • Accept events and I/O events share a thread pool and are not tied to a specific thread

  • Acceptor(Nio2Acceptor) is used to accept new connections and register handlers for I/O events

  • LimitLatch implements connection number limits by blocking threads in the thread pool

  • I/O handlers are read and write processors registered with Nio2SocketWrapper. When an I/O event arrives, the program selects a thread to execute the code for these processors

  • The overall process is as follows: a new connection arrives -> select a thread to execute Nio2Acceptor’s code -> submit a task to the thread pool to increase the number of connections -> register a read/write event ->I/O event arrives and select a thread to handle the I/O event

Thought the migration

Do not use the default thread pool When an asynchronous ServerSocketChannel is created, Tomcat will create a thread pool instead of using the default provided thread pool, since the thread pool is in our control, thus implementing the connection number limit function

Do not block THE I/O thread the I/O thread must have a subchild of the I/O thread. Do not perform operations on the I/O thread that would block for a long time

Appendix How to Debug Tomcat

As most backend programmers know, Tomcat is embedded in SpringBoot (and jetty, depending on how you choose), so we can create a New SpringBoot application that is dedicated to learning tomcat’s source code.

The following is the process of Tomcat debugging

  • Step 1: Open IDEA
  • The second step is to create a SpringBoot project
  • Third, find the Tomcat JAR package in the project sidebar with Ctrl+F

  • Step 4, /mute all, put on your headphones and press a break point

If you want to test the NIO handling of Tomcat, use the following class interruption points (see my understanding if you want to understand how NIO handles Tomcat)

package org.apache.tomcat.util.net;
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel.SocketChannel> {...public class Poller implements Runnable {
    public void run(a) {
        // The code for this method is on line 692}}... }Copy the code

If you want to test Tomcat’s NIO2 handling, you need the following configuration to add the following code to your code. (Since the default I/O mode of Tomcat embedded in SpringBoot is NIO, we need to configure the connector of NIO2)

import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConnectorConf {
    // Pay attention to your SpringBoot version, this project version is 2.2.0, older version 1.5 uses a different class configuration
    @Bean
    public TomcatServletWebServerFactory servletContainer(a) {
        TomcatServletWebServerFactory tomcatServletWebServerFactory =
                new TomcatServletWebServerFactory();
        tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector());
        return tomcatServletWebServerFactory;
    }

    private Connector getConnector(a) {
       // Key points
        Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol");
        // Set the connector port to 801, so that access to 801 is NIO2 mode
        connector.setPort(801);
        returnconnector; }}Copy the code

To org.apache.tomcat.util.net.Nio2Endpoint hit a breakpoint is finished

How to debug multithreading

In the case of multi-threading, the breakpoint may not be accessed. In this case, just right-click on the breakpoint and select Thread. IDEA sends a notification when other threads reach the breakpoint, as shown in the following figure