1. Traditional blocking I/O
Blocking I/O blocking means that read and write functions of the socket are blocked.
1.2 Blocking I/O programming model
public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8091); System.out.println("step1: bind 8091"); While (true) {// Block Socket Socket = serversocket.accept (); System.out.println("step2: accept " + socket.getPort()); new Thread(() -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) { String line; // block while ((line = reader.readline ())! = null) { System.out.println(line); out.println("Server recv:" + line); } } catch (Exception e) { } }).start(); }}Copy the code
The accept, read, and write functions of the socket block synchronously, so the main thread constantly calls the Accept function of the socket, polling the TCP connection in the ESTABLISHED state.
The read function reads the prepared data from the kernel buffer and copies it to the user process. If there is no data in the kernel buffer, the thread is suspended and CPU usage is freed. When the data is ready in the kernel buffer, the CPU responds to the I/O interrupt signal, waking up the blocked thread to process the data.
When a connection is processing I/O, the system is blocked, and a single thread is bound to hang there. But the CPU is freed up, and when you turn on multithreading, you can make the CPU do more things.
Blocking I/O model
Disadvantages of blocking I/O
Lack of extensibility and heavy reliance on threads. The Java thread usage ranges from 512K to 1M. If the number of threads is too large, THE JVM memory will overflow. A large number of thread context switches take a heavy toll on CPU performance. A large number of I/O threads being activated causes a sawtooth load on the system.
2. NIO programming
Synchronize the non-blocking I/O model
For NIO, an EWOULDBLOCK error is returned if there is no data in the kernel buffer, and generally the process can poll and call read, copying the data into user space when there is data in the buffer, rather than suspending the thread.
So synchronous non-blocking means that the socket’s read-write function is not blocking, but the user process still needs to poll the read-write function, so it is synchronous. But NIO gives us the possibility to take advantage of the CPU without having to start new threads, which is I/O multiplexing
2.1 I/O multiplexing technology
In Linux, the select/poll/epoll method can be used to monitor multiple sockets on a single thread. If a socket has data in its read cache, the method returns immediately and you can read the readable socket. If all the socket read caches are empty, It blocks, that is, suspends the thread.
Linux started with Select, but SELCT was slower and epoll was eventually used.
2.1.1 Advantages of epoll
- The supported open socket descriptor (FD) is limited only by the operating system’s maximum number of file handles, while select supports a maximum of 1024.
- Selcet scans all sockets each time, whereas epoll scans only active sockets.
- Use Mmap to speed up the copying of data from kernel space to user space.
2.2 Working mechanism of NIO
NIO is actually an event-driven model, and the most important thing in NIO is the Selector. In NIO, it provides the ability to select ready events, we just register a Channel with a Selector, and the Selector constantly polls the Channel registered on it through the select method (actually epoll in the operating system), If there’s a read ready, write ready, or connection coming on a Channel, it gets polled by Selector, The set of channels that would otherwise be blocked on the select method can then be retrieved through the SelectionKey (which is returned when a Channel is registered with a Selector).
Selector calls the select method. Instead of a thread using a for loop to select a ready Channel, the operating system notifies the JVM’s thread via epoll of which Channel a readready or write-ready event occurred. So the SELECT method is more like a listener.
The core purpose of multiplexing is to operate more channels with fewer threads, not just one thread inside. The number of threads created is determined by the number of channels, with one new thread created for every 1023 channels registered.
At the heart of NIO is the multiplexer and the event model, and understanding these two points really helps you understand how NIO works. Originally, I felt very complicated when learning NIO. With the in-depth understanding of TCP, I found NIO is not difficult. When using NIO, the core generation is to register the Channel and the event to listen on with the Selector.
Events supported by different types of channels
NIO event model schematic diagram:
2.2.1 Code examples
ServerReactor
@Slf4j public class ServerReactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; private volatile boolean stop = false; public ServerReactor(int port, int backlog) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(port), backlog); serverSocket.setReuseAddress(true); serverSocketChannel.configureBlocking(false); / / will be registered to the multiplexer channel, and to monitor the ACCEPT events serverSocketChannel. Register (selector, SelectionKey. OP_ACCEPT); } public void setStop(boolean stop) { this.stop = stop; } @override public void run() {try { stop && ! Thread.interrupted()) { int num = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); Remove (); // Remove key, otherwise it will cause events to consume it.remove(); try { handle(key); } catch (Exception e) { if (key ! = null) { key.cancel(); if (key.channel() ! = null) { key.channel().close(); } } } } } } catch (IOException e) { e.printStackTrace(); } if (selector ! = null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); }} private void handle(SelectionKey) throws Exception {if (key.isValid()) {// If the ACCEPT event is accepted, If (key.isacceptable ()) {ServerSocketChannel ServerSocketChannel = (ServerSocketChannel) key.channel(); // The accept event must be consumed using the ACCEPT method, Otherwise will cause the multiplexer infinite loop SocketChannel SocketChannel = serverSocketChannel. The accept (); // Set to non-blocking mode to return null when no connections are available instead of blocking. socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = socketChannel.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String content = new String(bytes); System.out.println("recv client content: " + content); ByteBuffer writeBuffer = ByteBuffer.allocate(1024); Writebuffer.put ((" server received: "+ content).getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); } else if (readBytes < 0) { key.cancel(); socketChannel.close(); } } } } }Copy the code
ClientReactor
public class ClientReactor implements Runnable { final String host; final int port; final SocketChannel socketChannel; final Selector selector; private volatile boolean stop = false; public ClientReactor(String host, int port) throws IOException { this.socketChannel = SocketChannel.open(); this.socketChannel.configureBlocking(false); Socket socket = this.socketChannel.socket(); socket.setTcpNoDelay(true); this.selector = Selector.open(); this.host = host; this.port = port; } @override public void run() {try {// If the connection is blocked, the connection is started immediately; // In non-blocking mode, the connection is not initiated immediately, but at a later time. If the connection is established immediately, the channel is blocked. If the connection is successful, this method returns true. // If the channel is in blocking mode, calls to this method will block until a connection is established or an I/O error occurs. // If the connection is not immediately established, the channel is in non-blocking mode, then this method returns false, / / and later must by invoking finishConnect () method to verify whether the connection done. / / a socketChannel isConnectionPending () to judge whether the channel is to connect the if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } while (! stop && ! Thread.interrupted()) { int num = selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); Remove (); // Remove key, otherwise it will cause events to consume it.remove(); try { handle(key); } catch (Exception e) { if (key ! = null) { key.cancel(); if (key.channel() ! = null) { key.channel().close(); } } } } } } catch (IOException e) { e.printStackTrace(); } if (selector ! = null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handle(SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel socketChannel = (SocketChannel) key.channel(); if (key.isConnectable()) { if (socketChannel.finishConnect()) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = socketChannel.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); System.out.println("recv server content: " + new String(bytes)); } else if (readBytes < 0) { key.cancel(); socketChannel.close(); } } } } private void doWrite(SocketChannel socketChannel) { Scanner scanner = new Scanner(System.in); new Thread(() -> { while (scanner.hasNext()) { try { ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put(scanner.nextLine().getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); } catch (Exception e) { } } }).start(); }}Copy the code
Reference article:
- [Gossip high concurrent those myths, see how jingdong architects pull it down the altar
] (mp.weixin.qq.com/s/lAqn8CfSR…). 4. “NIO and Socket Technology programming Guide”