The book continues with an in-depth look at Java IO and NIO (I), detailing how NIO solves the problem of blocking apis in network communication.

We’ll see how to solve this problem step by step from a file server.

First practice

Suppose there is a file server to provide the service of uploading pictures, there are a large number of client pictures to upload, how should we provide this service?

First, I plan to use A TCP connection to let the client upload the file. Each time the file is uploaded, a connection is established. In each connection, the client will disconnect after sending an image.

With that in mind, take a look at how I wrote the file server.

File Service Server

Check out my GitHub code

  • BioServer
  • BioClient
class BioServer{
  public static void main(String[] args) throws IOException {
    ServerSocketChannel acceptSocket = ServerSocketChannel.open();
    acceptSocket.bind(new InetSocketAddress(6666));
    while (true){
        SocketChannel tcpSocket = acceptSocket.accept();
        Task task = newTask(tcpSocket); task.start(); }}}class Task extends Thread{
    SocketChannel tcpSocket;
    public Task(SocketChannel tcpSocket) {
        this.tcpSocket = tcpSocket;
    }
    @Override
    public void run(a) {
        try {
            handleAAccept(tcpSocket);
        } catch(IOException e) { e.printStackTrace(); }}// Save the image from the client locally
    private static void handleAAccept(SocketChannel tcpSocket) throws IOException {
    FileChannel fileChannel = FileChannel.open(Paths.get("pic.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while(tcpSocket.read(buffer) ! = -1) {// This type of channel returns -1 only when closed
        buffer.flip();
        fileChannel.write(buffer);
        buffer.clear(); // Switch to write mode to allow the pipeline to continue reading the file's data
    }
Copy the code

That’s what the server-side code looks like

  • ServerSocketChannel.open()Enable a ServerSocketChannel to listen for connections to port 6666
  • While loop callacceptSocket.accept()To listen for connections to be established.
    • Accept is blocked
    • If the connection is established, the TCP connection is sent to the Task thread for processing.
  • The Task reads the information sent by the tcpSocket connection and stores it in a file.

In this implementation, a connection needs to be handled by one thread.

Analyze blocked system calls

In this implementation, a connection needs to be handled by one thread, mainly because read/write blocks.

The blocking of the read

Read Reads a sequence of bytes from a Channel into a given buffer.

  • Tcpsocket.read (buffer) is the same as the previous system call read, which blocks,
  • If the buffer has available data, it returns the available data and tries to read up to r bytes from the channel, where r is the number of bytes remaining in the buffer, dst.Remaining ().

Available data that buffer applications can read on TCP shown in green below:

  • The read system call returns -1 only after it has read all ACK data when it initiates a FIN packet
    • tcpSocket.shutdownOutput();The shutdown() system call shuts down communication on a socket in a specified direction
    • tcpSocket.close();
    • Tcpsocket. read(buffer) returns -1 when RCV_BUF has no data to read and a FIN packet is received

Blocking and non-blocking returns are different for read system calls, as shown in the figure below

The blocking of the write

For TCP if the available window is 0, the write operation is blocked.

Did a small experiment, the client kept sending data, but let the server application do not receive data. Write operations on the sender are blocked, and it is found that the receive buffer on the receiver may be 500K in size.

You can also try this experiment. Feel the blocked write.

Transfer process optimization – use zero copy

As previously analyzed, NIO uses MappedByteBuffer to greatly optimize the data replication process.

We can also use MappedByteBuffer in this file server if the size of the data to be transferred is agreed.

 void handleAAccept(SocketChannel tcpSocket) throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get("test.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
        fileChannel.transferFrom(tcpSocket,0.19300);
    }
Copy the code

Save the image passed from the client locally via filechannel. transferFrom(tcpSocket,0,19300). The client’s logic for sending images can also be simplified to Filechannel.transferto (0, size,tcpSocket).

public class BioClient {
    public static void main(String[] args) throws IOException {
        SocketChannel tcpSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1".6666));
        FileChannel fileChannel = FileChannel.open(Paths.get("***/http.png"), StandardOpenOption.READ);
        long size = fileChannel.size();
        fileChannel.transferTo(0, size,tcpSocket); fileChannel.close(); tcpSocket.close(); }}Copy the code

Blocking of transferTo and transferFrom

The transferTo and transferFrom operations block here as well.

TransferFrom transfers bytes from a given readable byte channel to a file on that channel.

  • If it’s file to file, the blocking is still limited
  • From network to file to operation, waiting for data can be blocked for a long time

Here’s an example:The client sends 1000 bytes, transferFrom takes the available 800 bytes and returns.

And here’s the situation:

The transferFrom waits for the next two bytes unless the client sends two more bytes or the connection is closed.

Summary BIO model of initial implementation

The implementation of the file server has been modeled to meet the requirements for uploading images and is optimized using zero-copy technology. It is a BIO model.

BIO stands for blocked IO. Blocking does not take up CPU time, but it does take up threads. However, each connection corresponds to a new thread, which is not supported by C10K concurrency.

So can I use non-blocking?

Non-blocking service model

In JAVA NIO, you can set a parameter to make a Channel read or write non-blocking. Such as tcpSocket. ConfigureBlocking (false); Instead of blocking, read returns 0 when no data is available.

A non-blocking implementation of the Reply server

Forget about the file server, the client and the server can talk. As shown below:

The server reads a small text message from the client and replies that it was received. The specific code is:

  • Server code

  • The code on the client side

Part of the code is as follows:

public class MultiServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress(6666));
        serverSocket.configureBlocking(false);
        Selector selector = Selector.open();
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        while (selector.select() > 0) {//select returns the number of ready events
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    SocketChannel tcpSocket = serverSocket.accept();
                    tcpSocket.configureBlocking(false);
                    tcpSocket.register(selector, SelectionKey.OP_READ);
                    System.out.println("Establish a connection with the client");
                } else if (key.isReadable()) {
                    try {
                        handle(key);
                    } catch (IOException e) {
                        key.channel().close();
                    }
                }
            }
        }
    }
    private static void handle( SelectionKey key) throws IOException {
        SocketChannel tcpSocket = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String received="";
        int i=0;
        while ((i=tcpSocket.read(buffer) )> 0) {//>0 indicates no blocking
            buffer.flip();
            received = new String(buffer.array(), 0, buffer.remaining());
            System.out.println("The client said." + received);
            buffer.clear();
        }
        System.out.println("i = " + i);
        buffer.put(("I got it."+ received).getBytes(StandardCharsets.UTF_8)); buffer.flip(); tcpSocket.write(buffer); buffer.clear(); }}Copy the code

You can see that multiple clients and servers are talking, andThe server uses only one thread!Using selector multiplexing technology, the underlying principle is epoll, and Java NIO implementation is horizontal triggered epoll mode, need to useSelectionKey key = iterator.next(); iterator.remove();If you remove the event, it indicates that the event is processed.

  • registered serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT).
  • For established connections, register againtcpSocket.configureBlocking(false); tcpSocket.register(selector, SelectionKey.OP_READ);
  • And the Selector is going to listen for that thingThese registered types of these channelsIn the event
    • Key.isacceptable () indicates that there is an acceptable connection event. If SocketChannel tcpSocket = serversocket.accept () is used, the connection is not blocked and can be established

    • Key.isreadable () means that the key has a readable event, which will not block when used with read and will read data directly.

    • Write is also non-blocking.

Register the OP_WRITE event

Int write(ByteBuffer SRC) returns the length of bytes to be written. In non-blocking mode, this returns even if the prepared buF has not been written out, such as 0. Because the buffer has no free space, OP_WRITE events need to be registered when writing large data, and SELECT notifies the application when the send window is available.

if (writed<remaining){
    tcpSocket.register(selector,SelectionKey.OP_WRITE);
    break;
}
Copy the code

When notified, remember to address:

.if (selectionKey.isWritable()){
    // Write the remaining data
    continueWriteFileToSocket(selectionKey,channel, fileChannel, buffer, selector);
}

private static void continueWriteFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel, ByteBuffer buffer, Selector selector) throws IOException {
    // The kennel got part of the buffer before, and the position has been changed
    int writed = tcpSocket.write(buffer);
    if (writed<buffer.remaining()){
       return;
    }
    buffer.clear();
    // If you can finish writing, you can continue to put data into the buffer and continue to send data to the network
    while(fileChannel.read(buffer) ! = -1) {
        buffer.flip();
        int remaining = buffer.remaining();
        writed = tcpSocket.write(buffer);
        System.out.println("write = " + writed);
        if (writed<remaining){
            return;
        }else {
            buffer.clear();
        }
    }
    key.cancel();
}
Copy the code
  • Cancel when you’re done.

Zero copy in non-blocking scenarios

In non-blocking scenarios, the transferTo function can also be used to reduce the number of copies, but note the above write problems.

     long position;
    void transferFileToSocket(SocketChannel tcpSocket, FileChannel fileChannel, Selector selector) throws IOException {
        long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
        position+=l;
        System.out.println("position = " + position);
        if (position<fileChannel.size()){
            / / didn't finish it
            tcpSocket.register(selector,SelectionKey.OP_WRITE|SelectionKey.OP_READ);
            System.out.println("selector.keys() = "+ selector.keys()); }}void continueTransferFileToSocket(SelectionKey key,SocketChannel tcpSocket, FileChannel fileChannel,Selector selector ) throws IOException {
        System.out.println("continueTransferFileToSocket");
        long l=fileChannel.transferTo(position, fileChannel.size(), tcpSocket);
        position+=l;
        System.out.println("position = " + position);
        if (position<fileChannel.size()){
            / / didn't finish it
            return;
        }
      
       tcpSocket.register(selector,SelectionKey.OP_READ);
  
    }
Copy the code

Here the Channel also registers the read event, so cancel the key without using cancel.

  • Register to write Register SelectionKey. OP_WRITE | SelectionKey OP_READ
  • Re-register selectionKey.op_read when unwrite later

So far, NIO has solved not only the problem of multiple data copies, but also the problem of blocking apis.

Close the connection properly

While the server is processing, be surePay attention to connection handling. Because the client can close the connection at any time. If a server goes down because of a client shutdown, other services can be affected. For example, if a client is forced to close:At this point, the server displays:IOException causes the server to exit, but the entire process can catch the exception. If there is no catch, the server will exit unexpectedly, which affects the service of the server.

Port 6666 of the server

# tcpdump -i any port 6666
localhost.49775 > localhost.6666: Flags [S], seq 3053657778, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 0,sackOK,eol], length 0
localhost.6666 > localhost.49775: Flags [S.], seq 3892119155, ack 3053657779, win 65535, options [mss 16324,nop,wscale 6,nop,nop,TS val 1370645599 ecr 1370645599,sackOK,eol], length 0
localhost.49775 > localhost.6666: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.6666 > localhost.49775: Flags [.], ack 1, win 6371, options [nop,nop,TS val 1370645599 ecr 1370645599], length 0
localhost.49775 > localhost.6666: Flags [P.], seq 1:4, ack 1, win 6371, options [nop,nop,TS val 1370647611 ecr 1370645599], length 3
localhost.6666 > localhost.49775: Flags [.], ack 4, win 6371, options [nop,nop,TS val 1370647611 ecr 1370647611], length 0
localhost.6666 > localhost.49775: Flags [P.], seq 1:16, ack 4, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647611], length 15
localhost.49775 > localhost.6666: Flags [.], ack 16, win 6371, options [nop,nop,TS val 1370647612 ecr 1370647612], length 0

localhost.49775 > localhost.6666: Flags [F.], seq 4, ack 16, win 6371, options [nop,nop,TS val 1371319563 ecr 1370647612], length 0
 localhost.6666 > localhost.49775: Flags [.], ack 5, win 6371, options [nop,nop,TS val 1371319563 ecr 1371319563], length 0
 localhost.6666 > localhost.49775: Flags [P.], seq 16:28, ack 5, win 6371, options [nop,nop,TS val 1371319573 ecr 1371319563], length 12
 localhost.49775 > localhost.6666: Flags [R], seq 3053657783, win 0, length 0
Copy the code

The first is the three-way handshake:

  • [S] The client sends SYN packets. Seq =3053657778,win=65535
  • [S.] server ack=3053657779,seq=3892119155,win=65535
  • [.] client ack=1 win 6371

Then it sends data: a shot, a receipt; Finally the client closes the connection:

  • The client sends the FIN packet
  • The server kernel replies with an ACK
  • The server application continues to send data and does not close the connection as well
  • After the client sends a Reset packet, the client forcibly closes the connection.

The server should handle connection closure properly, such as the following:

 SocketChannel tcpSocket = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = tcpSocket.read(buffer);
    if (read ==-1){
        key.channel().close();
        return; }}Copy the code

If tcpsocket. read returns -1, a FIN packet was received, and the server closes the connection. That’s the perfect four-wave. The following shows

 # tcpdump -i any -nn -s0 -v --absolute-tcp-sequence-numbers port 6666> : 1.51461:1.6666: Flags [F.], cksum 0x0028 (incorrect -> 0x5f54), seq 220187003, ack 2240511794, win 6371, Options [NOP, NOP,TS val 1372377293 ECR 1372375988], length 0 1.6666 > ::1.51461: Flags [.], cksum 0x0028 (incorrect -> 0x5a3b), ack 220187004, win 6371, Options [NOP, NOP,TS val 1372377293 ECR 1372377293], length 0 1.6666 > ::1.51461: Flags [F.], cksum 0x0028 (incorrect -> 0x5a3a), seq 2240511794, ack 220187004, win 6371, Options [NOP, NOP,TS val 1372377293 ECR 1372377293], length 0 1.51461 > ::1.6666: Flags [.], cksum 0x0028 (incorrect -> 0x5a3a), ack 2240511795, win 6371, options [nop,nop,TS val 1372377293 ecr 1372377293], length 0Copy the code

The connection is then in TIME_WAIT state.

tcp6       0      0  localhost.51947      localhost.6666     TIME_WAIT
Copy the code

TIME_WAIT only waits for two MSLS, which seems to be around 2S.

If the client sends a Reset message to close the connection, the connection will be released. If the server sends data again, some unexpected exceptions may occur.

Summary of IO reuse model – REACTOR single thread model

By introducing the Selector, a single thread can do all the connection and client communication. But with our implementation, there are some drawbacks:

  • Clients interact with each other, and if one client is stuck for too long, new connections will be affected
  • But it doesn’t make good use of multicore cpus

In fact, the above example is the single-thread REACTOR model, such as Redis is used by this model, because the scenes used by Redis get/ SET are memory operations, the speed is very fast, using the single-thread REACTOR model can achieve high performance.

The Server Reactor single-thread processing model is shown below

File server: REACTOR multithreaded model

Let’s say we’re using the Reactor single-thread model to transfer files. We don’t know the size of the file, and we don’t know when we’ve received an image. Some conventions need to be made between the client and server:

  • The sender first agrees on the size of the picture
  • The connection to the receiving end needs to receive such a large file before writing it all to the file, and then the transfer is really over.
  • In addition, after receiving the image, it may take a lot of time to verify the compressed watermark processing, so we need a thread pool to help deal with this, so that business processing can not occupy the time of connection and network reading and writing, so that services affect each other.

An implementation with a Process thread pool

  • Server code
  • Client code

Use a thread pool to help deal with business Executors. NewFixedThreadPool (8)

    static ExecutorService executorService = Executors.newFixedThreadPool(8);
    // We need an array of connections to record size,acc
    static ConcurrentHashMap<SocketChannel, HashMap<String,Object>> channelMap = new ConcurrentHashMap<>();
      staticConcurrentHashMap<Future<? >,SocketChannel> tasks=new ConcurrentHashMap<>();
    
       if (key.isReadable()){
                    SocketChannel tcpSocket =(SocketChannel) key.channel();
                    HashMap<String,Object> prop = channelMap.get(tcpSocket);
                    ByteBuffer buffer = ByteBuffer.allocate(1024*4);
                    while (true) {int read = tcpSocket.read(buffer);
                        if(! (read >0)) break;
                        buffer.flip();
                        if (prop.get("size").equals(0L)) {long size = 0;
                            try {
                                size = buffer.getLong();
                            } catch (Exception e) {
                                System.out.println("Files need to be uploaded according to the specified protocol: size+ files");
                                tcpSocket.close();
                                break;
                            }
                            prop.put("size",size);
                            prop.put("fileChannel",FileChannel.open(Paths.get(size+".png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE));
                        }
                        if ((Long)prop.get("size") >0L) {
                            FileChannel fileChannel =(FileChannel) prop.get("fileChannel");
                            fileChannel.write(buffer);
                            buffer.clear();
                            Long size = (Long)prop.get("size");
                            Long acc = (Long)prop.get("acc");
                            prop.put("acc",acc+read);
                            System.out.println("Progress =" + ((acc - 8) * 1.0 / size * 1.0) * 100);
                        }
                    }
                    completed((Long)prop.get("size"), (Long)prop.get("acc"), tcpSocket,channelMap);
                }

  

    private static void completed(Long size,Long acc,SocketChannel tcpSocket,ConcurrentHashMap<SocketChannel,HashMap<String,Object> > channelMap) throws IOException, ExecutionException, InterruptedException {
        if (acc == size +8) {
            HashMap<String,Object> prop = channelMap.get(tcpSocket);
            Future<String> future = executorService.submit(new Processor(prop));
            // Add the future and channelMap to the array.tasks.put(future,tcpSocket); handleTasks(); }}private static void handleTasks(a) throws InterruptedException, ExecutionException, IOException {
        / / traverse the task
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        for(Future<? > future1 : tasks.keySet()) {if (future1.isDone()){
                SocketChannel tcpSocket = tasks.get(future1);
                String o = future1.get().toString();
                buffer.put(o.getBytes());
                buffer.flip();
                tcpSocket.write(buffer);
                buffer.clear();
                // Close file close release resource...tasks.remove(future1); }}}}class Processor implements Callable<String> {
    public String call(a) throws Exception {
          // Suppose you are dealing with business
          int millis = new Random().nextInt(3000);
            System.out.println("sleep millis " + millis);
            Thread.sleep(millis);
        return "Finished drawing collection"; }}Copy the code

To do IO reuse, you need an array to hold the basic properties of the file (such as size, name, upload progress, etc.) and the connection relationship; When thread pools are used, task execution becomes very complex

  • Wait for the business logic to complete before returning data to the client
Future<String> future = executorService.submit(new Processor(prop));
            // Add the future and channelMap to the array.
            tasks.put(future,tcpSocket);
Copy the code
  • The task needs to be processed in a unified manner. You do not know when the task will be finished. You need to call the task again if the select timeout blocks
while (selector.select(1000) > =0) {// If the task is blocked all the time, it may not be executed
    handleTasks();
Copy the code

The timeout of the select may be specified according to the business scenario, such as 10ms or 1ms.

More elegant packaging

The code implementation above is quite complex. The code is split into a Reactor class, Acceptor class, and Handler class by a single responsibility principle. This is the Reactor multithreading model.

conclusion

With a concrete example, this article demonstrates step-by-step how NIO solves the problem of blocking apis in network communication, which brings many performance benefits but also some programming and understanding complexities.

In order to improve the robustness and high availability of the file server, Reactor multithreading model is also used.