Yesterday I went home by car, and I clocking a force button in the car. After finishing the work, I was idle and bored. I saw that I had collected Akka before, and vaguely remembered that it was an execution unit smaller than thread (so understood at that time). I happen to be suffering from a pile of code that WebFlux+Reactor has written. So I thought: Why doesn’t Java have such a thing that can be handled mindlessly by lightweight threads?

Then I saw that Java was pushing a project, whose name I forgot, to implement this function. But since it is implemented, it is currently not available, but saw Golang processing and Java NIO comparison, found that the essence of both let originally a thread to deal with one thing, into a thread to deal with multiple things. Golang coroutines aside, the implementation lies in the Golang custom scheduler; But NIO for Java we can talk about.

I seem to be limited to using NIO’s single-threaded, multi-threaded, master-slave multi-threaded Echo Server. Use Netty to write HelloWorld or something like that. I have not studied deeply all the time. When I think of the sign of “rigorous academic study” hanging on the dormitory bed (which has its own history), I feel ashamed of myself. After a good Google in my car time, I decided to write this article with my own understanding.

What is the I/O?

NIO/BIO/AIO, we have to understand what I/O is. I thought it was just reading and writing, but it turns out it’s not that simple.

I/O is literally translated as input/output. For example, read from disk/write to disk, receive data from network card/write data to network card, database query/add delete change database. All operations involving disks and networks are referred to as I/O operations. Let me just give you a general definition like that.

What is blocking?

Some conventions: If not specified, blocking here is all I/O blocking and does not include lock contention during synchronization. When a thread is blocked because it is locked, it is blocked because it is waiting for an I/O operation. It is blocked because it is trying to acquire a resource but has failed, and it cannot continue until the resource has been acquired.

Before looking at blocking, let’s look at the speed of each device:

CPU: 1ns, Register: 1NS, Cache: 10NS, Memory: 10US, Disk: 10ms, Network: 100ms

1s=1000ms, 1ms=1000us, 1us=1000ns, 1ns=1000ps.

I/O is a disk or network operation, and I/O operations are more than 1,000 times faster than memory operations, and more than 10^6 faster than the CPU, so any I/O operation is long in the CPU’s eyes.

Because of this, blocking on I/O operations causes the current thread to be scheduled into a waiting queue (see thread state switching) and then switched to the ready list to be scheduled when the data arrives.

Blocking is the process of a thread waiting for an I/O operation to complete. When a thread wants to read data from the network, it waits for the network card to be ready, then the data is transferred, and then the kernel copies the data into the user’s memory. Data transmission is complete after the network adapter is ready and all queued write requests are completed. This is a long process.

In network programming, blocking refers to this long network process where threads are suspended, placed in waiting queues, waiting for data to arrive, waiting for data to be written out. It’s going to stop there, and it’s not going to go down.

So let’s look at a picture.

The blue part is blocking, the blocking process during network operations.

Blocking/non-blocking IO? Synchronous/Asynchronous IO?

Ok, so now we know that blocking is when an I/O operation is too slow for the thread to continue executing because it is suspended.

A standard network read looks like this:

  • 1 ️ Nic data is received and placed in kernel space
  • 2 ️ The kernel copies data to user space.

A standard network would look like this:

  • 1 ️ Kernel copies data to kernel space
  • 2 ️ Nic reads data from kernel space and sends it out.

The basis for judging whether an operation is I/O blocking lies in whether the first ️ step is blocking; The basis for determining whether an IO is synchronous/asynchronous is whether step 2 ️ is blocked.

Does that make sense to you?

BIO/NIO/AIO

If you have written Socket communication, the process is basically like this, regardless of your language:

Create a ServerSocket=> set listening address => Accept a connection and return a Socket=> Continue listening, and the new thread processes the Socket just returned.

This is not bad, a very common Socket/ServerSocket server, right? This was the case with the early Tomcat.

Now let’s look at the threading part of the process, the part where new threads handle sockets. Why do we do this?

Because each Socket network read/write is a time-consuming process, if we do not open new threads, will block the following connections, so the overall system connection number, throughput will be greatly reduced.

One thread per connection seems like a good solution and a good solution to the problem of not having multiple connections. But with one thread per connection, will the system crash with a high number of connections? After all, Java threads are operating system threads, and the next Linux thread is close to a process, so creating and destroying a schedule is not trivial. Even if we have such a thing as thread pools, they are finite and maintenance is a burden. Is there a solution?

BIO over so far, the following is the introduction of NIO/AIO, need to explain here, NIO is blocking IO, referring to the each time reads waiting for data to be read, not like BIO instead returns, the return value represents a readable data size, unreadable to 1, so the NIO nonblocking are implemented here, NIO simply returns the read operation immediately, without waiting for the data to be read. The same is true for write operations.

Smart readers will immediately realize that nothing has changed since you have to continuously poll the return value to see if it is readable, and this will cause the CPU to idle, which is worse than the BIO. And so it is. So NIO just doesn’t block the program, but it doesn’t speed up the overall time.

So far we know that Socket processing requires threads because the network reads/writes are too slow (we’ll ignore the time-consuming operations in the business logic, such as querying the database, etc.) and the threads can’t do any work, so it blocks. In addition, we know that the CPU is not working while the program is blocked, so why not let another thread execute during that time? But this introduces a new problem, if I schedule another thread, how do I know when the data is ready, or when it’s writable?

B: well… Smell a trace of asynchronous + callback flavor. Since the program can’t do it, can we see what the operating system can do for us?

Linux provides SELECT, Poll, and ePoll, macOS provides KQueue, and Windows provides IOCP. They are I/O multiplexing technologies.

I/O multiplexing was mentioned, what does that mean?

Before I say that, let me introduce a few more concepts:

  • 1 ️ interruption: Interruption refers to the hardware of the computer other than the CPU, by placing a signal on the bus to remind the CPU that something has arrived. For example, a clock interrupt is sent out at a fixed frequency to tell the CPU the time interval; A disk interrupt is when the data is read and placed in the specified memory address. A network interrupt is when the data reaches the network adapter and is placed in the specified memory address.
  • 2 ️ interrupt handler: a small program that, when the interrupt is captured by the CPU, will call the corresponding interrupt handler according to the interrupt type; A specific clock interrupt can be used to trigger a thread scheduler, which is suitable for time-sharing systems.
  • 3 ️ file descriptor FD: Everything in Linux is a file, and the processing of any device consists of no more than four operations: opening, closing, reading and writing. So the file descriptor is the ID of an abstract file, which can be a file on disk, a remote process (IP :port), a keyboard, or a mouse.
  • 4 ️DMA: direct memory access, CPU assistant, for I/O device read and write is no longer fully responsible by the CPU, but by the CPU to DMA, DMA is responsible for, which is generally a chip on the motherboard.

The essence of I/O multiplexing is interrupt + corresponding interrupt handler => high level language encapsulation use.

Why all this talk? FDS were introduced to show that each Socket(which is essentially a remote process, and in the Internet world, IP:PORT can locate a process globally) has a unique FD that represents it. Here we use NIO’s SocketChannel instead of BIO’s Socket, both of which represent the Socket of the remote process.

Look directly at a graph to see how select works.

Poll, on the other hand, turns the fd_set implementation from an array to a linked list, so there’s no limit to the number.

Epoll? Is through the red black tree, two-way list, callback mechanism to achieve more efficient polling. You give 👴 a bit more specific! ?

Epoll has three system calls: epollc_reate(); epoll_ctl; Epoll_wait ().

Epoll_create (): Creates a space in memory for a red-black tree (this space is in physical memory, not virtual memory). The red-black tree is used to record inserts, deletes, and changes to fd. A two-way linked list is also created to hold the ready FD.

Epoll_ctl (): Adds a FD to the red-black tree, registers a driver with the kernel (some articles say the device), adds the FD to the ready list when it is available, and wakes up the epoll thread.

Epoll_wait (): Epoll iterates over the ready list, suspending if it is empty, and copying ready FDs into user space if it is not. In addition, for performance optimization, the physical space storing ready FDS is mapped to virtual memory in the same way as user space and kernel space.

In addition, if the number of connections is high, it is possible that the device will be constantly interrupted. In order to prevent the device from constantly calling, Linux uses an optimization trick to turn the interrupt into polling. When the number of connections is high and sockets are active, direct kernel manual polling for available FDS, which degenerates to polling, but may yield better performance. But notice that in general we say that epoll is based on interrupts, so that interrupts can be put into the ready table in one step instead of just knowing which device is available like select, which fd has to be polled all over again. Just because the poll is a linux-based optimization does not mean that ePoll is polling.

Now let’s see why select does not perform as well as epoll:

  • 1️ SELECT cannot know which FD is available, only which device is available, and it needs to traverse fd_set for judgment.
  • 2️ SELECT Requires replication of ready FD_set to user space, epoll is achieved by virtual memory remapping (inter-process communication — shared memory)

Now we leave the remote process readable/writable to the operating system, which uses nic interrupts for asynchronous notification. It frees up our program instead of having to wait.

Wait a minute. Freed up our program? Automatic read/write notifications available? How Nice it looks! Let’s try it with NIO.

Oh ho ho ~ NIO+I/O multiplexing =>Reactor model.

So the Reactor model is operating system dependent, which is actually hardware interrupt dependent. All hardware notifications throughout the computer are implemented by interrupts (as a matter of fact, other hardware interrupts are implemented by polling their own state, but CPU interrupts are also implemented by looking at pin signals each clock cycle).

NIO+I/O multiplexing in Java is ServerSocketChannel+SocketChannel+Selector+SelectionKey+Buffer. Consider a typical Echo server based on NIO+I/O multiplexing.

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;

/ * * *@author CodeWithBuff
 */
public class NioTcpSingleThread {

    public static void main(String[] args) {
        NioTcpSingleThread.Server.builder().build().run();
    }

    private static final HashMap<SocketChannel, List<DataLoad>> dataLoads = new LinkedHashMap<>();

    private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    private static class DataLoad {
        private int intValue;

        private long longValue;

        private double doubleValue;

        private String stringValue;

        private int[] intArray;

        private long[] longArray;

        private double[] doubleArray;

        private String[] stringArray;
    }

    /** * Java NIO handles network core components only four: {@linkThe Channel}, {@linkThe Selector}, {@linkSelectionKey} and {@linkJava.nio. Buffer} * <br/> * Say {@linkServerSocketChannel}, {@linkSocketChannel}, {@linkThe Selector} and {@linkRelation between SelectionKey}. * <br/> * {@linkServerSocketChannel} and {@linkSocketChannel {SocketChannel} {SocketChannel} {SocketChannel} {SocketChannel} {SocketChannel}@linkAbstractSelectableChannel}, that is, it both inherited classes. * <br/> * {@linkSelector#select()} calls the system call, polls the port, and records the registered {@linkAbstractSelectableChannel} events of interest, in the event of all registered {@linkAbstractSelectableChannel} interested in one of the events, it returns. Otherwise it blocks. * <br/> * For {@linkHow to let {AbstractSelectableChannel},@linkHow about Selector} for recording and polling the events that you're interested in? The answer is: register at {@linkSelector}, and set the event type of interest. * <br/>@linkA variable of type SelectionKey} that allows you to manipulate {@linkAbstractSelectableChannel} and {@linkThe Selector}. {@linkSelectionKey} itself is {@linkAbstractSelectableChannel} and it registered to {@linkThe credentials of Selector}. *, like an order, records the relationship between the two, so after a successful registration, usually use {@linkSelectionKey} to achieve. At the same time, {@linkSelectionKey} also has an attachment() method that gets the object attached to it. * Normally we use this object to process the current {@linkSelectionKey} contains {@linkAbstractSelectableChannel} and {@linkThe actual business of Selector}. <br/> <br/>@linkSelector#select()}, which blocks until an event of interest occurs, but sometimes we can be sure that an event is happening right now or has already happened, so we can call {@linkSelector#wakeup()}@linkSelector#select()} returns immediately, then gets * {@linkSelectionKey {SelectionKey}@linkSelector#select()}(this is already the next loop) * <br/> * <br/> * Pay attention!! If a {@linkIn the same {AbstractSelectableChannel}@linkTwo different event types of interest are registered on the Selector}, and the two {@linkSelectionKey} is irrelevant. Although you can use the {@linkSelectionKey} * {@linkAbstractSelectableChannel} event types of interest. {@linkSelectionKey} is only returned at registration, so (Channel + Selector) = SelectionKey. But gee, registering more than one Selector will get stuck, so never register more than one Channel and the same Selector!! * /
    @Builder
    private static class Server implements Runnable {

        @Override
        public void run(a) {
            System.out.println("Server running...");
            Selector globalSelector;
            ServerSocketChannel serverSocketChannel;
            SelectionKey serverSelectionKey;
            try {
                globalSelector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(8190));
                serverSocketChannel.configureBlocking(false);
                serverSelectionKey = serverSocketChannel.register(globalSelector, SelectionKey.OP_ACCEPT);
                serverSelectionKey.attach(Acceptor.builder()
                        .globalSelector(globalSelector)
                        .serverSocketChannel(serverSocketChannel)
                        .build()
                );
                while (true) {
                    // Select () is a serious blocking method that blocks until one of any registered events of interest to the SocketChannel has occurred. For example, a new connection is established, the Channel can be read, or the Channel can be written
                    // Its return value indicates that there are several events of interest, which are not really useful, so it is simply ignored here
                    globalSelector.select();
                    Set<SelectionKey> selectionKeySet = globalSelector.selectedKeys();
                    for(SelectionKey selectionKey : selectionKeySet) { dispatch(selectionKey); selectionKeySet.remove(selectionKey); }}}catch (IOException ignored) {
            }
        }

        private void dispatch(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}@Data
    @Builder
    private static class Acceptor implements Runnable {

        private final Selector globalSelector;

        private final ServerSocketChannel serverSocketChannel;

        @Override
        public void run(a) {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                System.out.println("Connection established...");
                socketChannel.configureBlocking(false);
                SelectionKey socketSelectionKey = socketChannel.register(globalSelector, SelectionKey.OP_READ);
                socketSelectionKey.attach(Handler.builder()
                        .socketSelectionKey(socketSelectionKey)
                        .build()
                );
                // The Channel of interest for reading is registered, so to quickly start reading, wake up the selector directly. I'm ready, I think you have the data, so just go back.
                globalSelector.wakeup();
            } catch (IOException ignored) {
            }
        }
    }

    / * * * "write" rely on "read" operation to read the data, so can't again after "write" on using a "writing", must be "read" or "closed". * < br / > * "read" after the operation can continue to "read" without having to wait for "write complete", so "finished" can be interested in the type is set to "read" | "write" instead of just "writing". * /
    @Data
    @Builder
    private static class Handler implements Runnable {

        private final SelectionKey socketSelectionKey;

        @Override
        public void run(a) {
            SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
            if(! socketChannel.isOpen()) { System.out.println("Connection closed");
                try {
                    socketChannel.shutdownInput();
                    socketChannel.shutdownOutput();
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return ;
            }
            if (socketSelectionKey.isReadable()) {
                System.out.println("Read events occur, prepare to read...");
                Reader.builder()
                        .socketChannel(socketChannel)
                        .build()
                        .run();
                // The description is interested in both reading and writing (because the client may be long connected and send the message again), but the same SelectionKey can only be either read or write
                socketSelectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                // After reading, prepare to write
                socketSelectionKey.selector().wakeup();
            }
            if (socketSelectionKey.isWritable()) {
                System.out.println("Write about the event, prepare to write...");
                Writer.builder()
                        .socketChannel(socketChannel)
                        .build()
                        .run();
                socketSelectionKey.interestOps(SelectionKey.OP_READ);
                // If you have finished writing, it is not necessary to return immediately
                // socketSelectionKey.selector().wakeup();}}}@Data
    @Builder
    private static class Reader implements Runnable {

        private final SocketChannel socketChannel;

        @Override
        public void run(a) {
            try {
                byteBuffer.clear();
                int readable = socketChannel.read(byteBuffer);
                byte[] bytes = byteBuffer.array();
                String value = new String(bytes, 0, readable);
                System.out.println("读到了: " + value);
                DataLoad dataLoad = DataLoad.builder()
                        .stringValue(value)
                        .build();
                List<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedList<>());
                tmp.add(dataLoad);
            } catch (IOException ignored) {
            }
        }
    }

    @Data
    @Builder
    private static class Writer implements Runnable {

        private final SocketChannel socketChannel;

        @Override
        public void run(a) {
            try {
                String value = "Server get: " + dataLoads.get(socketChannel).get(0).getStringValue();
                dataLoads.get(socketChannel).remove(0);
                socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)));
            } catch (IOException ignored) {
            }
        }
    }
}
Copy the code

NIO provides notification when the data is readable/writable, and then reads and writes, while AIO directly transfers the data to the specified area. In other words, it does not need to read or write the data itself. When it is called, the data is already ready. However, Linux and actual production use is not much, we will not mention it, just give an example code:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

/ * * *@author CodeWithBuff
 */
public class AioTcpSingleThread {

    public static void main(String[] args) {
        Server.builder().build().run();
        // Prevent the main thread from exiting
        LockSupport.park(Long.MAX_VALUE);
    }

    private static final ConcurrentHashMap<AsynchronousSocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();

    private static final ReentrantLock READ_LOCK = new ReentrantLock();

    private static final ReentrantLock WRITE_LOCK = new ReentrantLock();

    private static final ByteBuffer READ_BUFFER = ByteBuffer.allocate(1024 * 4);

    private static final ByteBuffer WRITE_BUFFER = ByteBuffer.allocate(1024 * 4);

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    private static class DataLoad {
        private int intValue;

        private long longValue;

        private double doubleValue;

        private String stringValue;

        private int[] intArray;

        private long[] longArray;

        private double[] doubleArray;

        private String[] stringArray;
    }

    @Builder
    private static class Server implements Runnable {

        @Override
        public void run(a) {
            try {
                System.out.println("Server start...");
                asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
                asynchronousServerSocketChannel.bind(new InetSocketAddress(8190));
                asynchronousServerSocketChannel.accept(null, ACCEPTOR);
            } catch (IOException ignored) {
            }
        }
    }

    private static AsynchronousServerSocketChannel asynchronousServerSocketChannel = null;

    private static final Acceptor ACCEPTOR = new Acceptor();

    private static class Acceptor implements CompletionHandler<AsynchronousSocketChannel.Object> {
        // This method is called asynchronously, so don't worry about blocking to the main thread
        @Override
        public void completed(AsynchronousSocketChannel result, Object attachment) {
            System.out.println("Connection established:" + Thread.currentThread().getName());
            System.out.println("Connection established");
            dataLoads.computeIfAbsent(result, k -> new LinkedBlockingQueue<>());
            // Use a loop to read and write multiple times
            while (result.isOpen()) {
                READ_LOCK.lock();
                // This method is also asynchronous
                result.read(READ_BUFFER, attachment, new Reader(result, READ_BUFFER.array()));
                READ_BUFFER.clear();
                READ_LOCK.unlock();
                WRITE_LOCK.lock();
                String ans = "";
                try {
                    ans = "Server get: " + dataLoads.get(result).take().getStringValue();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                / / asynchronous
                result.write(ByteBuffer.wrap(ans.getBytes(StandardCharsets.UTF_8)), attachment, new Writer(result));
                WRITE_LOCK.unlock();
            }
            System.out.println("End the communication once.");
            // Try to establish a second wave communication
            asynchronousServerSocketChannel.accept(attachment, ACCEPTOR);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("Failed to establish connection"); }}private static class Reader implements CompletionHandler<Integer.Object> {

        private final AsynchronousSocketChannel asynchronousSocketChannel;

        private final byte[] bytes;

        public Reader(AsynchronousSocketChannel asynchronousSocketChannel, byte[] bytes) {
            this.asynchronousSocketChannel = asynchronousSocketChannel;
            this.bytes = bytes;
        }

        @Override
        public void completed(Integer result, Object attachment) {
            System.out.println("Read data:" + Thread.currentThread().getName());
            if (result == 0| |! asynchronousSocketChannel.isOpen()) {return ;
            } else if (result < 0) {
                shutdown(asynchronousSocketChannel);
                return ;
            }
            System.out.println("Read data:" + result);
            String value = new String(bytes, 0, result);
            System.out.println("读到了: " + value);
            LinkedBlockingQueue<DataLoad> tmp = dataLoads.get(asynchronousSocketChannel);
            DataLoad dataLoad = DataLoad.builder()
                    .stringValue(value)
                    .build();
            tmp.add(dataLoad);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("Read failed"); }}private static class Writer implements CompletionHandler<Integer.Object> {

        private final AsynchronousSocketChannel asynchronousSocketChannel;

        public Writer(AsynchronousSocketChannel asynchronousSocketChannel) {
            this.asynchronousSocketChannel = asynchronousSocketChannel;
        }

        @Override
        public void completed(Integer result, Object attachment) {
            System.out.println("Write data:" + Thread.currentThread().getName());
            if(! asynchronousSocketChannel.isOpen()) {return ;
            }
            System.out.println("Write data:" + result);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("Write failed"); }}private static void shutdown(AsynchronousSocketChannel asynchronousSocketChannel) {
        try {
            asynchronousSocketChannel.shutdownInput();
            asynchronousSocketChannel.shutdownOutput();
            asynchronousSocketChannel.close();
        } catch (IOException ignore) {
        }
    }
}
Copy the code

What does NIO solve? What’s not resolved?

NIO enables one thread to manage I/O operations for multiple connections, rather than one thread per connection like BIO, which is implemented through interrupt mechanism + system calls. This allows all available I/O events to be processed in one thread.

Note that there is no difference between NIO and BIO if one Selector registers only one connection (generally speaking, one Selector for one thread, and one thread for multiple selectors reduces Selector efficiency).

Remember why we went from BIO to NIO? Because BIO cannot withstand large numbers of connections, NIO solves the problem of large connections.

However, NIO does not provide an increase in per-request speed, and keep in mind that even with a small number of connections, it is not as fast as BIO.

NIO use precautions

We have assumed that NIO does not require time-consuming business, but what if it does, such as database operations? Here we refer to the implementation of the NIO framework Netty.

Netty recommends that time-consuming operations be handled by the incoming custom thread pool, that is, submitting the task to the thread pool, then adding asynchronous calls, and when the task is finished, proceed to the next step. In short, time-consuming tasks are handled in the thread pool, while normal tasks are handled directly in the NIO thread.

In fact, this also corresponds to the multi-threaded Reactor model. In addition, there is a master-slave multithreaded Reactor, which is to separate the connection operation and make a single Selector to deal with the connection. The I/O operations after the connection are placed in other selectors, and the business is placed in the thread pool.

Take a look at the code for each of these models:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

/ * * *@author CodewithBuff
 */
public class NioTcpMultiThread {

    public static void main(String[] args) {
        Server.builder().build().run();
        Runnable target = executorService::shutdown;
        Thread shutdown = new Thread(target);
        Runtime.getRuntime().addShutdownHook(shutdown);
    }

    private static final ConcurrentHashMap<SocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();

    private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);

    private static final ReentrantLock reentrantLock = new ReentrantLock();

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    private static class DataLoad {
        private int intValue;

        private long longValue;

        private double doubleValue;

        private String stringValue;

        private int[] intArray;

        private long[] longArray;

        private double[] doubleArray;

        private String[] stringArray;
    }

    @Builder
    private static class Server implements Runnable {

        @Override
        public void run(a) {
            System.out.println("Server running...");
            Selector globalSelector;
            ServerSocketChannel serverSocketChannel;
            SelectionKey serverSelectionKey;
            try {
                globalSelector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(8190));
                serverSocketChannel.configureBlocking(false);
                serverSelectionKey = serverSocketChannel.register(globalSelector, SelectionKey.OP_ACCEPT);
                serverSelectionKey.attach(Acceptor.builder()
                        .serverSelectionKey(serverSelectionKey)
                        .build()
                );
                while (true) {
                    int a = globalSelector.select();
                    Set<SelectionKey> selectionKeySet = globalSelector.selectedKeys();
                    for(SelectionKey selectionKey : selectionKeySet) { dispatch(selectionKey); selectionKeySet.remove(selectionKey); }}}catch (IOException ignored) {
            }
        }

        private void dispatch(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}@Data
    @Builder
    private static class Acceptor implements Runnable {

        private final SelectionKey serverSelectionKey;

        @Override
        public void run(a) {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) serverSelectionKey.channel();
            Selector globalSelector = serverSelectionKey.selector();
            SocketChannel socketChannel;
            try {
                socketChannel = serverSocketChannel.accept();
                System.out.println("Connection established...");
                socketChannel.configureBlocking(false);
                SelectionKey socketSelectionKey = socketChannel.register(globalSelector, SelectionKey.OP_READ);
                socketSelectionKey.attach(Handler.builder()
                        .socketSelectionKey(socketSelectionKey)
                        .build()
                );
                globalSelector.wakeup();
            } catch (IOException ignored) {
            }
        }
    }

    @Data
    @Builder
    private static class Handler implements Runnable {

        private final SelectionKey socketSelectionKey;

        @Override
        public void run(a) {
            if(! socketSelectionKey.channel().isOpen()) { System.out.println("Connection closed");
                try {
                    socketSelectionKey.channel().close();
                } catch (IOException ignored) {
                }
                return ;
            }
            dataLoads.computeIfAbsent((SocketChannel) socketSelectionKey.channel(), k -> new LinkedBlockingQueue<>());
            if (socketSelectionKey.isReadable()) {
                Reader reader = Reader.builder()
                        .socketSelectionKey(socketSelectionKey)
                        .build();
                Thread thread = new Thread(reader);
                socketSelectionKey.interestOps(SelectionKey.OP_WRITE);
                thread.start();
            } else if (socketSelectionKey.isWritable()) {
                Writer writer = Writer.builder()
                        .socketSelectionKey(socketSelectionKey)
                        .build();
                Thread thread = newThread(writer); socketSelectionKey.interestOps(SelectionKey.OP_READ); thread.start(); }}}@Data
    @Builder
    private static class Reader implements Runnable {

        private final SelectionKey socketSelectionKey;

        @Override
        public void run(a) {
            try {
                SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
                String value;
                reentrantLock.lock();
                if (socketChannel.isOpen()) {
                    int readable = socketChannel.read(byteBuffer);
                    if (readable == 0) {
                        value = null;
                        // system.out.println (" read empty request ");
                    } else if (readable < 0) {
                        value = null;
                        shutdownSocketChannel(socketChannel);
                    } else {
                        value = new String(byteBuffer.array(), 0, readable); }}else {
                    value = null;
                }
                reentrantLock.unlock();
                if (value == null) {
                    return ;
                }
                System.out.println("读到了: " + value);
                DataLoad dataLoad = DataLoad.builder()
                        .stringValue(value)
                        .build();
                LinkedBlockingQueue<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedBlockingQueue<>());
                tmp.add(dataLoad);
                socketSelectionKey.selector().wakeup();
            } catch (IOException ignored) {
            }
        }
    }

    @Data
    @Builder
    private static class Writer implements Runnable {

        private final SelectionKey socketSelectionKey;

        @Override
        public void run(a) {
            try {
                SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel();
                LinkedBlockingQueue<DataLoad> queue = dataLoads.get(socketChannel);
                String value = "Server get: " + dataLoads.get(socketChannel).take().getStringValue();
                if (socketChannel.isOpen())
                    socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)));
                else{ shutdownSocketChannel(socketChannel); }}catch (IOException | InterruptedException ignored) {
            }
        }
    }

    private static void shutdownSocketChannel(SocketChannel socketChannel) {
        try {
            socketChannel.shutdownInput();
            socketChannel.shutdownOutput();
            socketChannel.close();
        } catch (IOException ignored) {
        }
    }
}
Copy the code

Master-slave multithreading:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

/ * * *@authorThe night of the thirteenth month */
public class NioTcpMainSubThread {

    public static void main(String[] args) throws IOException {
        new Server(Runtime.getRuntime().availableProcessors()).run();
    }

    private static final ConcurrentHashMap<SocketChannel, LinkedBlockingQueue<DataLoad>> dataLoads = new ConcurrentHashMap<>();

    private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);

    private static final ReentrantLock reentrantLock = new ReentrantLock();

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    private static class DataLoad {
        private int intValue;

        private long longValue;

        private double doubleValue;

        private String stringValue;

        private int[] intArray;

        private long[] longArray;

        private double[] doubleArray;

        private String[] stringArray;
    }

    /** * BossSelector connects only to the ServerSocketChannel and is single-threaded, running in the main thread. * 

* Then throw the established connection to Workers, who are a group of Workers, each Worker has a separate WorkSelector to process the SocketChannel to which the current Worker is assigned. *

* The strategy here is to commit in sequence, so that each Worker is responsible for the same number of Socketchannels as possible. *

* Each Worker runs on a separate thread, only polling Read/Write operations, time-consuming business operations (such as I/O, Compute) are assigned to the thread workpool. * /
private static class Server implements Runnable { private final Selector bossSelector; private final int workerCount; public Server(int workerCount) throws IOException { this.workerCount = workerCount; bossSelector = Selector.open(); } @Override public void run(a) { try { System.out.println("Server start..."); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8190)); serverSocketChannel.configureBlocking(false); SelectionKey serverSelectionKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT); serverSelectionKey.attach(new Boss(serverSocketChannel, workerCount)); while (true) { bossSelector.select(); Set<SelectionKey> selectionKeySet = bossSelector.selectedKeys(); SelectionKey = SelectionKey = SelectionKey = SelectionKeySelectionKey key = selectionKeySet.iterator().next(); Runnable runnable = (Runnable) key.attachment(); runnable.run(); selectionKeySet.remove(key); }}catch (IOException ignored) { } } } /** * Process the new connection, generate a SocketChannel and select a Worker to commit. * / private static class Boss implements Runnable { private final ServerSocketChannel serverSocketChannel; private final int workerCount; private final Set<SocketChannel>[] socketChannelSets; private final Worker[] workers; private int index = 0; @SuppressWarnings("unchecked") public Boss(ServerSocketChannel serverSocketChannel, int workerCount) throws IOException { this.serverSocketChannel = serverSocketChannel; this.workerCount = workerCount; ExecutorService executorService = Executors.newFixedThreadPool(workerCount); socketChannelSets = new Set[workerCount]; workers = new Worker[workerCount]; for (int i = 0; i < workerCount; ++ i) { workers[i] = newWorker(); socketChannelSets[i] = workers[i].getSocketChannels(); executorService.submit(workers[i]); }}@Override public void run(a) { Set<SocketChannel> socketChannelSet = socketChannelSets[index]; Selector workerSelector = workers[index].getWorkerSelector(); ++ index; if (index == this.workerCount) index = 0; try { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("Establish a connection..."); socketChannelSet.add(socketChannel); workerSelector.wakeup(); } catch (IOException ignore) { } } } /** * Processes the newly added SocketChannel and polls for Read/Write. * / private static class Worker implements Runnable { private final Selector workerSelector; private final Set<SocketChannel> socketChannels = new HashSet<>(); public Worker(a) throws IOException { workerSelector = Selector.open(); } @Override public void run(a) { while (true) { try { if (socketChannels.size() > 0) { for (SocketChannel socketChannel : socketChannels) { socketChannel.configureBlocking(false); SelectionKey selectionKey = socketChannel.register(workerSelector, SelectionKey.OP_READ); selectionKey.attach(new Handler(selectionKey)); socketChannels.remove(socketChannel); } System.out.println("New SocketChannel added"); } workerSelector.select(); Set<SelectionKey> selectionKeySet = workerSelector.selectedKeys(); for(SelectionKey key : selectionKeySet) { Runnable runnable = (Runnable) key.attachment(); runnable.run(); selectionKeySet.remove(key); }}catch (IOException ignored) { } } } public Set<SocketChannel> getSocketChannels(a) { return socketChannels; } public Selector getWorkerSelector(a) { returnworkerSelector; }}/** * Distribution service processing */ @Data @Builder private static class Handler implements Runnable { private final SelectionKey socketSelectionKey; private static final ExecutorService workPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); @Override public void run(a) { if(! socketSelectionKey.channel().isOpen()) { System.out.println("Connection closed"); try { socketSelectionKey.channel().close(); } catch (IOException ignored) { } return ; } dataLoads.computeIfAbsent((SocketChannel) socketSelectionKey.channel(), k -> new LinkedBlockingQueue<>()); if (socketSelectionKey.isReadable()) { Reader reader = Reader.builder() .socketSelectionKey(socketSelectionKey) .build(); workPool.submit(reader); socketSelectionKey.interestOps(SelectionKey.OP_WRITE); } else if(socketSelectionKey.isWritable()) { Writer writer = Writer.builder() .socketSelectionKey(socketSelectionKey) .build(); workPool.submit(writer); socketSelectionKey.interestOps(SelectionKey.OP_READ); }}}@Data @Builder private static class Reader implements Runnable { private final SelectionKey socketSelectionKey; @Override public void run(a) { try { SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel(); String value; reentrantLock.lock(); if (socketChannel.isOpen()) { int readable = socketChannel.read(byteBuffer); if (readable == 0) { value = null; // system.out.println (" read empty request "); } else if (readable < 0) { value = null; shutdownSocketChannel(socketChannel); } else { value = new String(byteBuffer.array(), 0, readable); }}else { value = null; } reentrantLock.unlock(); if (value == null) { return ; } System.out.println("读到了: " + value); DataLoad dataLoad = DataLoad.builder() .stringValue(value) .build(); LinkedBlockingQueue<DataLoad> tmp = dataLoads.computeIfAbsent(socketChannel, k -> new LinkedBlockingQueue<>()); tmp.add(dataLoad); socketSelectionKey.selector().wakeup(); } catch (IOException ignored) { } } } @Data @Builder private static class Writer implements Runnable { private final SelectionKey socketSelectionKey; @Override public void run(a) { try { SocketChannel socketChannel = (SocketChannel) socketSelectionKey.channel(); LinkedBlockingQueue<DataLoad> queue = dataLoads.get(socketChannel); String value = "Server get: " + dataLoads.get(socketChannel).take().getStringValue(); if (socketChannel.isOpen()) socketChannel.write(ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8))); else{ shutdownSocketChannel(socketChannel); }}catch (IOException | InterruptedException ignored) { } } } private static void shutdownSocketChannel(SocketChannel socketChannel) { try { socketChannel.shutdownInput(); socketChannel.shutdownOutput(); socketChannel.close(); } catch (IOException ignored) { } } } Copy the code

So here we know. NIO threads (Selector bound threads) only do I/O. No matter when your business logic is done and how long it takes, you just need the data to be ready and register a write event, then I NIO will notify you when it’s ready to write, and you write. In this way, I/O operations of other ServerSockets will not be affected as long as the time-consuming business is not in the NIO thread.

Finally, I want to be clear that the Selector for the Reactor model only implements data-readable/data-writable notifications (the Boss version might only do connectable notifications). We emphasize “available” here, meaning “available”, i.e. notification of the availability of I/O operations for a connection. So it needs to be used in conjunction with NIO to do NIO’s notification without NIO constantly polling.

If you want it to notify you of an operation, it’s very easy, just register with it and add a attachment implementation so that the Selector can call your callback when it’s available to do further operations, but don’t block your NIO Selector in that application. Failure to do so will affect subsequent I/O availability notifications for other connections.

reference

Implementation principle of Selector

Fully understand synchronous asynchronous blocking and non-blocking

Will I/O always occupy the CPU? – Answer by Zhao Xinlei – Zhihu

How does epoll or kqueue work? – The answer to everything about summer – Zhihu

The difference between select, poll and epoll

IO Multiplexing

Epoll,

How does epoll or kqueue work? – Xu Chen’s answer – Zhihu

25 pictures, 10,000 words, unpacking the Linux network packet sending process

High Concurrency Programming for Java

Modern Operating Systems

Understanding Computer Systems in Depth