This is the 15th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Juejin (juejin. Cn) introduces how to handle a Selector and how to handle a connection event.

Read the event

In the Accept event, if a client establishes a connection with the server, it needs to set its corresponding SocketChannel to non-blocking, register with the selection, and add a read event to it. When the client sends a message, the server channel can use the read() method to read.

The read() method reads the channel data into the buffer. It returns an integer representing the number of bytes read. A value of -1 indicates the end of the read operation.

else if(key.isReadable()){
    try{
        SocketChannel channel = ((SocketChannel) key.channel());/ / get the channel
        ByteBuffer buffer = ByteBuffer.allocate(100);
        int size = channel.read(buffer);// If the connection is normal, size = -1
        if(size == -1)
        {
            key.cancel();
        }
        else
        {
            buffer.flip();
            ByteBufferUtil.debugAll(buffer);// Cache visualization}}catch(IOException e) { e.printStackTrace(); key.cancel(); }}Copy the code

Note the two key.cancel() entries. Here’s what they do:

  • The first one is used at the end of a normal read, to terminate the read event, otherwise the selector thinks you didn’t handle the read event, it’s not blocking atselector.select()Up and into an endless loop.
  • The second is in the case of an abnormal client shutdown, where the code enters a catch block to terminate the read event, preventing an endless loop.

Delete events

Iterate.remove () is executed on each iteration of an event, and the event is removed by ite.remove (). I’ve included a screenshot of the code with comments to illustrate.

In this code, you can think of two collections:

  1. The selector established on line 20 maintains a collection of channels registered with that selector
  2. The code line 29selector.selectedKeys()It maintains a collection, which is a collection of events

At first the Selector set has only one SSC (the new ServerSocketChannel), which is concerned with the Accept event, and the event set is empty:

Then A client A connects to the server, and an Accept event is added to the set of events, and an SC (SocketChannel acquired) is added to the set of channels under selector, which is concerned with the read event. Note, however, that the event collection does not automatically delete completed events, so the connection event will remain in the event collection even after it has completed processing.

Then, client A on the connection sends A message to the server, at which point the selector finds A new event (read) and adds the read event on the SC channel to the set of events:

Now that we have a new event (read event), we’re no longer blocking selector. Select () on line 28, and now we’re going to go through the set of events and process each one, == because the set of events doesn’t automatically remove the completed event, we’re going to start with the connection event ==, so we’re going to go to the connection event processing on line 33, But there are no clients to connect to, so an error occurs.

So before we finish processing an event, we need to remove it from the event set, so line 32iter.remove()Is indispensable.

Message boundary problem

Actual transmission issues

The text transferred does not always exactly match the size of the buffer. In practice, one of the following situations can occur:

  • If the text is larger than the buffer size, expand the buffer size
  • Sticky packets occur: for example, the client sends “AB”, “CD”, “EF” three times, but the server receives “ABCDEF”.
  • Half packet: same as above, but server received “ABC”, “DEF”

There are generally three ways to solve this problem:

  • Fixed message length: The server reads the data packets of the same size each time. If a small amount of data is sent, the server fills the data packets until the length is the same as the specified message length. But this wastes bandwidth
  • The alternative is splitting by delimiter, which is inefficient and requires matching delimiters character by character (e.g. ‘\n’ to indicate the end of a message).
  • The Value is in TLV format, that is, Type, Length, and Value

NIO’s solution

The second method is used here, but when messages are too long to fit into one buffer, a larger buffer is used. This larger buffer first loads the contents of the previous buffer and then the incoming message:

The Channel register method takes a third parameter: Attachment, to which you can put an Object of type Object, which is bound to the registered Channel and its corresponding SelectionKey. Attachment can be obtained by using the Attachment () method, which is available through SelectionKey. Extend the life of the Bytebuffer to the same length as the key.

ByteBuffer buffer = ByteBuffer.allocate(16);
// Add the Buffer attachment corresponding to the channel
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
Copy the code

If the data in a Channel is larger than the buffer, expand the buffer. When a Channel calls the compact method, its position and limit are equal, indicating that the buffer is not being read (too small). Create a new buffer that is twice the size. It also copies data from the old buffer to the new buffer, and calls the attach method of the SelectionKey to place the new buffer into the SelectionKey as a new attachment

// If the buffer is too small, expand it
if (buffer.position() == buffer.limit()) {
    ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
    buffer.flip();
    // Put the contents of the old buffer into the new buffer
    newBuffer.put(buffer);
    // Attach the new buffer to the key
    key.attach(newBuffer);
}
Copy the code

Each message sent by the client ends with ‘\n’. If the size of each message sent by the client is smaller than the initial buffer size, the output can be directly. If the size is larger than the initial buffer size, the client chennel bound buffer needs to be expanded:

public class SelectorServer {

    private static void split(ByteBuffer buffer) {
        buffer.flip();
        for(int i = 0; i < buffer.limit(); i++) {
            // Traverse to find the delimiter
            // Get (I) does not move position
            if (buffer.get(i) == '\n') {
                // Buffer length
                int length = i+1-buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                // Write the previous content to the target buffer
                for(int j = 0; j < length; j++) {
                    // Write data from buffer to target
                    target.put(buffer.get());
                }
                // Print the resultByteBufferUtil.debugAll(target); }}// Switch to write mode, but the buffer may not read out, so use Compact
        buffer.compact();
    }


    public static void main(String[] args) throws IOException
    {
        Selector selector = Selector.open();/ / build the selector
        try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
            ssc.configureBlocking(false);// Set the channel to non-blocking mode
            SelectionKey ssckey = ssc.register(selector,0.null);// Register the channel with selector
            ssckey.interestOps(SelectionKey.OP_ACCEPT );// Set attention to connection events
            ssc.bind(new InetSocketAddress(8080));
            while(true)
            {
                int channelnum = selector.select();// block, wait for an event, stop blocking if an event occurs
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();// Get the event that happened
                while(iter.hasNext()){// Iterate over unprocessed events
                    SelectionKey key = iter.next();// Get the SelectionKey of the event
                    iter.remove();
                    if (key.isAcceptable()) {// Connection event
                        ServerSocketChannel channel = ((ServerSocketChannel) key.channel());// Get the channel for the event
                        SocketChannel sc = channel.accept();// Accept connections from clients
                        sc.configureBlocking(false);
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        SelectionKey sckey = sc.register(selector, 0, buffer);// Register the new channel with selector
                        sckey.interestOps(SelectionKey.OP_READ);
                    }
                    else if(key.isReadable()){
                        try{
                            SocketChannel channel = ((SocketChannel) key.channel());/ / get the channel

                            // Get the attachment through key
                            ByteBuffer buffer = ((ByteBuffer) key.attachment());
                            int size = channel.read(buffer);// If the connection is normal, size = -1
                            if(size == -1)
                            {
                                key.cancel();
                                channel.close();
                                ByteBufferUtil.debugAll(((ByteBuffer) key.attachment()));// Cache visualization
                            }
                            else
                            {
                                split(buffer);
                                if(buffer.position()==buffer.limit())// The buffer is full and needs to be expanded
                                {
                                    ByteBuffer newbuffer = ByteBuffer.allocate(buffer.capacity()*2);Create a new buffer
                                    buffer.flip();// Switch mode
                                    newbuffer.put(buffer);// Put the contents of the original buffer into the new buffer
                                    key.attach(newbuffer);// Replace the attachment for this channel with a new, larger buffer}}}catch(IOException e) { e.printStackTrace(); key.cancel(); }}else if(key.isWritable()){

                    }
                    else if(key.isConnectable()){

                    }
                }
            }
        }
    }
}

Copy the code

Write the event

If the server writes data to the client, it will be stored in buffer and then transmitted through SocketChannel. However, when the amount of data transmitted is very large, there will be a problem. To analyze the problem of this method, let’s look at the following code:

This is the code on the server side, which accepts the connection to the client and sends a large amount of data over the channel. Of course, it may not be transmitted all at once, so in a while loop, we print out the number of bytes each time we transmitThis is the code for the client, which creates a buffer to receive the data from the server. Similarly, it cannot receive the data at once, so it outputs the number of bytes received after each receive:

Below is performed as a result, the left is a full service side as a result, the right is a part of the client as a result, can be found that the service side there are a large number of 0, the server-side program belongs to idle in the while loop, explore the reasons, mainly because of the service side too much a transfer of data (e.g., 4587485) for the first time, However, the client has limited receiving capability. The server cannot continue to send data before the client receives data. Therefore, the server will be idled.

To solve this idling problem, we can have the selector multiplexer control the channel, and if the data can be transmitted (i.e. the client has received the last data sent), let the channel continue to send a batch, otherwise let it process other events or simply block.

Based on the previous analysis, we can design such an idea: send data once after receiving the client request. If not, register write events for the channel, and let the channel continue to transmit the remaining data in the code block of writing events. The specific steps are as follows:

  1. Sending large amounts of data to clients
  2. The return value represents the number of bytes actually written
  3. Determine if there is any content left
  4. Focus on writable events
  5. Attach unfinished data to sckey
  6. Clean up the operation

There are two areas of code that need to be modified on the server side, as shown in the figure below

The first step is to change the while to if. If the buffer is not transmitted for the first time, we need to register the write event for this channel and attach the buffer containing the rest of the data to it.This extends the lifetime of the buffer, allowing the channel to use the buffer in writing blocks.

The second is to add code to write the rest of the buffer to the write module. There is no need for the while (which is the same as in the first version), because it is a write event, so as long as the selector detects that the channel can write, it continues to output data to the client. It is important to note that buffer.hasRemaining() is called each time the write is complete, and then the associated buffer is cleared and the write event is cancelled.

The following graph shows the result on the server side. You can see that there is no output 0, indicating that idle is no longer occurring.

Server-side read and write complete code

The complete code for the server is as follows:

public class SelectorServer {

    private static void split(ByteBuffer buffer) {
        buffer.flip();
        for(int i = 0; i < buffer.limit(); i++) {
            // Traverse to find the delimiter
            // Get (I) does not move position
            if (buffer.get(i) == '\n') {
                // Buffer length
                int length = i+1-buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                // Write the previous content to the target buffer
                for(int j = 0; j < length; j++) {
                    // Write data from buffer to target
                    target.put(buffer.get());
                }
                // Print the resultByteBufferUtil.debugAll(target); }}// Switch to write mode, but the buffer may not read out, so use Compact
        buffer.compact();
    }


    public static void main(String[] args) throws IOException
    {
        Selector selector = Selector.open();/ / build the selector
        try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
            ssc.configureBlocking(false);// Set the channel to non-blocking mode
            SelectionKey ssckey = ssc.register(selector,0.null);// Register the channel with selector
            ssckey.interestOps(SelectionKey.OP_ACCEPT );// Set attention to connection events
            ssc.bind(new InetSocketAddress(8080));
            while(true)
            {
                int channelnum = selector.select();// block, wait for an event, stop blocking if an event occurs
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();// Get the event that happened
                while(iter.hasNext()){// Iterate over unprocessed events
                    SelectionKey key = iter.next();// Get the SelectionKey of the event
                    iter.remove();
                    if (key.isAcceptable()) {// Connection event
                        ServerSocketChannel channel = ((ServerSocketChannel) key.channel());// Get the channel for the event
                        SocketChannel sc = channel.accept();// Accept connections from clients
                        sc.configureBlocking(false);
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        SelectionKey sckey = sc.register(selector, 0, buffer);// Register the new channel with selector
                        sckey.interestOps(SelectionKey.OP_READ);

                        StringBuffer sb = new StringBuffer();
                        for (int i = 0; i < 9999999; i++) {
                            sb.append('a');
                        }
                        ByteBuffer bb = Charset.defaultCharset().encode(sb.toString());
                        if(bb.hasRemaining()){ sc.register(selector,SelectionKey.OP_WRITE,bb); }}else if(key.isReadable()){
                        try{
                            SocketChannel channel = ((SocketChannel) key.channel());/ / get the channel

                            // Get the attachment through key
                            ByteBuffer buffer = ((ByteBuffer) key.attachment());
                            int size = channel.read(buffer);// If the connection is normal, size = -1
                            if(size == -1)
                            {
                                key.cancel();
                                channel.close();
                                ByteBufferUtil.debugAll(((ByteBuffer) key.attachment()));// Cache visualization
                            }
                            else
                            {
                                split(buffer);
                                if(buffer.position()==buffer.limit())// The buffer is full and needs to be expanded
                                {
                                    ByteBuffer newbuffer = ByteBuffer.allocate(buffer.capacity()*2);Create a new buffer
                                    buffer.flip();// Switch mode
                                    newbuffer.put(buffer);// Put the contents of the original buffer into the new buffer
                                    key.attach(newbuffer);// Replace the attachment for this channel with a new, larger buffer}}}catch(IOException e) { e.printStackTrace(); key.cancel(); }}else if(key.isWritable()){

                        SocketChannel writechannel = (SocketChannel) key.channel();
                        // Get the buffer that contains the remaining data that has not been transferred
                        ByteBuffer buffer = ((ByteBuffer) key.attachment());
                        int writesize = writechannel.write(buffer);
                        System.out.println(writesize);
                        if(! buffer.hasRemaining()) {// Clear the associated buffer
                            key.attach(null);
                            // Cancel the event
                            key.interestOps(0); }}else if(key.isConnectable()){

                    }
                }
            }
        }
    }
}
Copy the code