In the previous two articles, the question of how to implement concurrent processing of clients for TCP or UDP servers was left.

The intuitive idea would be to create a single thread for each incoming request, but this would be a waste of resources. Instead, use a thread pool to manage the threads to save resources. Take the TCP server as an example.

You first need to define a task that needs to be submitted to the thread pool.

public class TCPRunnable implements Runnable {
    private Socket mSocket;

    public TCPRunnable(Socket socket) {
        mSocket = socket;
    }

    @Override
    public void run(a) {
        try {
            System.out.println("Handling client: " + mSocket.getRemoteSocketAddress());
            InputStream in = mSocket.getInputStream();
            OutputStream out = mSocket.getOutputStream();
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            String line;
            System.out.println("Client said: ");
            while((line = br.readLine()) ! =null) {
                System.out.println(line);
            }
            out.write("Welcome!".getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if(mSocket ! =null) { mSocket.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code

In the constructor, you need to pass in a Socket instance, and when the task is submitted to the thread pool, the Socket reads and writes are performed in the asynchronous thread.

The server side can now be improved so that it only needs to submit the task after obtaining the Socket instance

public class TCPServer1 {
    public static void main(String[] args) {
        ExecutorService mThreadPool = Executors.newCachedThreadPool();
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(8890);
            while (true) {
                Socket socket = serverSocket.accept();
                mThreadPool.execute(newTCPRunnable(socket)); }}catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if(serverSocket ! =null) { serverSocket.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code

Using a thread pool seems perfect, but now consider that if a client wants to maintain a persistent connection to the server, then obviously a thread pool also limits the number of concurrent accesses by the client, since there are only a few core threads. How about increasing the number of core threads in the thread pool? It could be, but by how much? With millions of clients out there, you can’t choose! And increasing the number of threads only brings more thread overhead, including thread scheduling and context switching. At the same time, we also face a constant, that is multi-threaded access to critical resources, we need to synchronize or lock, these hidden overhead is beyond the developer’s control.

The arrival of Java NIO solves these problems and allows servers to handle thousands of clients simultaneously while still performing well. So in this article, we will explore what is strong about NIO.

Channel

NIO uses channels to send and receive data, rather than traditional streams (InputStream/OutputStream).

A Channel instance represents the opening of a connection to an entity such as a hardware device, a file, a network socket, and so on. A feature of a Channel is that operations on a Channel, such as reading and writing, are thread-safe.

SelectableChannel

SelectableChannel is an abstract class that implements the Channel interface. This class is special.

First, SelectableChannel can be in blocking or non-blocking mode. In blocking mode, any I/O operation on this channel is blocked until the I/O completes. In non-blocking mode, any I/O on the channel is not blocked, but the number of bytes transmitted may be fewer or none than originally requested.

And secondly SelectableChannel can be used to multiplexe by selectors, . But first you need to call selectableChannel configureBlocking (false) is adjusted for non-blocking mode (from mode), it is very important. And then you register

SelectionKey register(Selector sel, int ops)
SelectionKey register(Selector sel, int ops, Object att)
Copy the code

The first argument represents the Selector instance to register. We’ll talk about selectors later.

The second parameter represents the operation of interest to this channel, which is defined in the SelectionKey class, as follows

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
Copy the code

For a SocketChannel, the only operations it is interested in are OP_READ, OP_WIRTE, and OP_CONNECT; however, it does not include OP_ACCEPT. ServerSocketChannel can be interested in all four operations. Why? Because only ServerSocketChannel has an accPET () method.

SocketChannel and ServerSocketChannel are subclasses of SelectableChannel.

The third argument, Object att, is an attachment that you can bring with you when you register.

The register() method returns a SelectionKey instance. The SelectionKey is equivalent to a Java Bean. It is a container for the three parameters of register(), which it returns and sets

Selector selector(a);
int interestOps(a);
Object attachment(a)
Copy the code

SocketChannel

SocketChannel indicates a socket channel.

A SocketChannel instance is created through its static open() method

    public static SocketChannel open(a) throws IOException {
        return SelectorProvider.provider().openSocketChannel();
    }

    public static SocketChannel open(SocketAddress remote)
        throws IOException
    {
        // 1. ceate socket channel
        SocketChannel sc = open();
        try {
            // 2. connect channel's socket, blocking until connected or error
            sc.connect(remote);
        } catch (Throwable x) {
            try {
                sc.close();
            } catch (Throwable suppressed) {
                x.addSuppressed(suppressed);
            }
            throw x;
        }
        assert sc.isConnected();
        return sc;
    }    
Copy the code

While the open() method merely creates a SocketChannel object, open(SocketAddress Remote) goes a step further by calling connect(ADDR) to connect to the server.

SocketChannel is a subclass of SelectableChannel. Remember the earlier feature of SelectableChannel? If the blocking mode is not configured, then the SocketChannel object is blocked by default, and the open(SocketAddress Remote) method is blocked to open the server connection. And any I/O operation on a SocketChannel is blocking.

So since SelectableChannel can be used in non-blocking mode without blocking any I/O operation, we can first call the open() method with no parameters, then configure it to non-blocking mode, and then make a connection, which is a non-blocking connection. The pseudo-code is as follows

// Create a SocketChannel instance
SocketChannel sc = SocketChannel.open();
// Adjust to non-blocking mode
sr.configureBlocking(false);
// Connect to the server
sr.connect(remoteAddr);
Copy the code

While the connect() method is non-blocking, we can use the isConnectionPending() method to check whether the connection is still pending. If the connection is pending, we can do something else instead of blocking until the connection is established. Here we can see the benefits of using NIO.

If isConnectionPending() returns false, the connection has been established, but we also need to call finishConnect() to complete the connection.

Implement the client with SocketChannel

public class NonBlockingTCPClient {
    public static void main(String[] args) {
        byte[] data = "hello".getBytes();
        SocketChannel channel = null;
        try {
            // 1. open a socket channel
            channel = SocketChannel.open();
            // adjust to be nonblocking
            channel.configureBlocking(false);
            // 2. init connection to server and repeatedly poll with complete
            // connect() and finishConnect() are nonblocking operation, both return immediately
            if(! channel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8899))) {
                while(! channel.finishConnect()) { System.out.print(".");
                }
            }

            System.out.println("Connected to server...");

            ByteBuffer writeBuffer = ByteBuffer.wrap(data);
            ByteBuffer readBuffer = ByteBuffer.allocate(data.length);
            int totalBytesReceived = 0;
            int bytesReceived;
            // 3. read and write bytes
            while (totalBytesReceived < data.length) {
                if (writeBuffer.hasRemaining()) {
                    channel.write(writeBuffer);
                }
                if ((bytesReceived = channel.read(readBuffer)) == -1) {
                    throw new SocketException("Connection closed prematurely");
                }
                totalBytesReceived += bytesReceived;
                System.out.print(".");
            }
            System.out.println("Server said: " + new String(readBuffer.array()));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 4 .close socket channel
            try {
                if(channel ! =null) { channel.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code

First, create an instance of SocketChannel and configure it to be in non-blocking mode. Only in non-blocking mode will any I/O operation on the instance of SocketChannel be non-blocking. This makes our client a non-blocking client and improves client performance.

The second step is to connect to the server using the connect() method while using the while loop to continuously detect and fully connect. We don’t have to wait so long, but this is just to show you how to connect. You must use finishConnect() to complete the connection process before you need to perform I/O immediately.

The third step is to read and write bytes using a ByteBuffer. Why do we keep reading and writing with a while loop? Remember when we talked about SelectableChannel being non-blocking? If a SelectableChannel is in non-blocking mode, its I/O operations may read and write fewer bytes than they actually are, or none at all. So what we’re going to do here is we’re going to loop in and out, and we’re going to make sure that the read and write is complete.

Socketchannel.write () : A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket’s output buffer.

ServerSocketChannel

The ServerSocketChannel class represents a server-socket channel.

ServerSocketChannel, like SocktChannel, needs to be created using the static open() method. Once created, it needs to be bound to the local IP address and port using the bind() method

ServerSocketChannel bind(SocketAddress local)
ServerSocketChannel bind(SocketAddress local, int limitQueue)
Copy the code

The SocketAddress local parameter represents the local IP address and port number, and the int limitQueue parameter limits the number of connections.

Selector

A Selector is a multiplexer of a SelectableChannel. Multiple SelectableChannel can be managed with a single Selector. For example, we can use selectors to manage multiple ServerSocketchannels in one thread, so we can listen for multiple port requests in a single thread, which is beautiful. Here, too, we can see the benefits of using NIO.

Creating a Selector Instance

The Selector instance also needs to be created using the static open() method.

Registered SelectableChannel

As I said before, we need to register the Selector by calling register() of the SelectableChannel, which will return a SelctionKey to represent the registration.

Select the channel

As mentioned earlier, you can manage multiple SelectableChannel by Selector. Its select() method monitors which channels are ready for I/O, and the return value represents the number of these I/ OS.

int select(a)
int select(long timeout)
int selectNow(a)
Copy the code

When the select() method is called, it stores the selectionkeys representing the channels that are ready for I/O operations in a collection that can be returned by selectedKeys().

Set<SelectionKey> selectedKeys(a)
Copy the code

The three methods of select() are named differently. The first method blocks the call, the third method sets a timeout, and the third method returns immediately.

wakeUp()

If calling selcet() causes the thread to block, or even block indefinitely, the wakeUp() method wakes up threads that are blocked by calling select().

Implement the server using selectors and ServerSocketChannel

package com.ckt.sockettest;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TCPChannelServer {
    public static void main(String[] args) {
        Selector selector = null;
        try {
            // 1. open a selector
            selector = Selector.open();
            // 2. listen for server socket channel
            ServerSocketChannel ssc = ServerSocketChannel.open();
            // must to be nonblocking mode before register
            ssc.configureBlocking(false);
            // bind server socket channel to port 8899
            ssc.bind(new InetSocketAddress(8899));
            // 3. register it with selector
            ssc.register(selector, SelectionKey.OP_ACCEPT);

            while (true) { // run forever
                // 4. select ready SelectionKey for I/O operation
                if (selector.select(3000) = =0) {
                    continue;
                }
                // 5. get selected keys
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                // 6. handle selected key's interest operations
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();

                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                        // get socket channel from server socket channel
                        SocketChannel clientChannel = serverSocketChannel.accept();
                        // must to be nonblocking before register with selector
                        clientChannel.configureBlocking(false);
                        // register socket channel to selector with OP_READ
                        clientChannel.register(key.selector(), SelectionKey.OP_READ);
                    }

                    if (key.isReadable()) {
                        // read bytes from socket channel to byte buffer
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(10);
                        int readBytes = clientChannel.read(readBuffer);
                        if (readBytes == -1) {
                            System.out.println("closed.......");
                            clientChannel.close();
                        } else if (readBytes > 0) {
                            String s = new String(readBuffer.array());
                            System.out.println("Client said: " + s);
                            if (s.trim().equals("Hello")) {
                                // attachment is content used to write
                                key.interestOps(SelectionKey.OP_WRITE);
                                key.attach("Welcome!!!"); }}}if (key.isValid() && key.isWritable()) {
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        // get content from attachment
                        String content = (String) key.attachment();
                        // write content to socket channel
                        clientChannel.write(ByteBuffer.wrap(content.getBytes()));
                        key.interestOps(SelectionKey.OP_READ);
                    }

                    // remove handled key from selected keysiterator.remove(); }}}catch (IOException e) {
            e.printStackTrace();
        } finally {
            // close selector
            if(selector ! =null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
Copy the code

The first step is to create the Selector instance.

Step 2: Create an instance of ServerSocketChannel, set it to non-blocking mode, and bind it to the local port.

Third, register the ServerSocketChannel instance with the Selector instance.

Step 4: Select some channels that are ready for I/O operations, with a 3-second timeout, which means blocking for 3 seconds.

Fifth, get the collection of selected selectionKeys.

Step 6, handle the operation of interest on the SelectionKey. The serverSocketChannel that is registered with the Selector must only be isAcceptable(), so that we can obtain the request SocketChannel instance from the client through its Accept () method. Then register the socketChannel with a selector and make it readable. So the next time you walk through the selectionKeys, you can handle that readable operation.

conclusion

In three articles, the outline of Java Sockets has been outlined. However, I have not been exposed to this content in my actual work, so these three articles are only a superficial introduction. If I have the opportunity to further study in the future, I will improve the content of these articles.