In the previous two articles, the question of how to implement concurrent processing of clients for TCP or UDP servers was left.
The intuitive idea would be to create a single thread for each incoming request, but this would be a waste of resources. Instead, use a thread pool to manage the threads to save resources. Take the TCP server as an example.
You first need to define a task that needs to be submitted to the thread pool.
public class TCPRunnable implements Runnable {
private Socket mSocket;
public TCPRunnable(Socket socket) {
mSocket = socket;
}
@Override
public void run(a) {
try {
System.out.println("Handling client: " + mSocket.getRemoteSocketAddress());
InputStream in = mSocket.getInputStream();
OutputStream out = mSocket.getOutputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
System.out.println("Client said: ");
while((line = br.readLine()) ! =null) {
System.out.println(line);
}
out.write("Welcome!".getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(mSocket ! =null) { mSocket.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code
In the constructor, you need to pass in a Socket instance, and when the task is submitted to the thread pool, the Socket reads and writes are performed in the asynchronous thread.
The server side can now be improved so that it only needs to submit the task after obtaining the Socket instance
public class TCPServer1 {
public static void main(String[] args) {
ExecutorService mThreadPool = Executors.newCachedThreadPool();
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(8890);
while (true) {
Socket socket = serverSocket.accept();
mThreadPool.execute(newTCPRunnable(socket)); }}catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(serverSocket ! =null) { serverSocket.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code
Using a thread pool seems perfect, but now consider that if a client wants to maintain a persistent connection to the server, then obviously a thread pool also limits the number of concurrent accesses by the client, since there are only a few core threads. How about increasing the number of core threads in the thread pool? It could be, but by how much? With millions of clients out there, you can’t choose! And increasing the number of threads only brings more thread overhead, including thread scheduling and context switching. At the same time, we also face a constant, that is multi-threaded access to critical resources, we need to synchronize or lock, these hidden overhead is beyond the developer’s control.
The arrival of Java NIO solves these problems and allows servers to handle thousands of clients simultaneously while still performing well. So in this article, we will explore what is strong about NIO.
Channel
NIO uses channels to send and receive data, rather than traditional streams (InputStream/OutputStream).
A Channel instance represents the opening of a connection to an entity such as a hardware device, a file, a network socket, and so on. A feature of a Channel is that operations on a Channel, such as reading and writing, are thread-safe.
SelectableChannel
SelectableChannel is an abstract class that implements the Channel interface. This class is special.
First, SelectableChannel can be in blocking or non-blocking mode. In blocking mode, any I/O operation on this channel is blocked until the I/O completes. In non-blocking mode, any I/O on the channel is not blocked, but the number of bytes transmitted may be fewer or none than originally requested.
And secondly SelectableChannel can be used to multiplexe by selectors, . But first you need to call selectableChannel configureBlocking (false) is adjusted for non-blocking mode (from mode), it is very important. And then you register
SelectionKey register(Selector sel, int ops)
SelectionKey register(Selector sel, int ops, Object att)
Copy the code
The first argument represents the Selector instance to register. We’ll talk about selectors later.
The second parameter represents the operation of interest to this channel, which is defined in the SelectionKey class, as follows
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
Copy the code
For a SocketChannel, the only operations it is interested in are OP_READ, OP_WIRTE, and OP_CONNECT; however, it does not include OP_ACCEPT. ServerSocketChannel can be interested in all four operations. Why? Because only ServerSocketChannel has an accPET () method.
SocketChannel and ServerSocketChannel are subclasses of SelectableChannel.
The third argument, Object att, is an attachment that you can bring with you when you register.
The register() method returns a SelectionKey instance. The SelectionKey is equivalent to a Java Bean. It is a container for the three parameters of register(), which it returns and sets
Selector selector(a);
int interestOps(a);
Object attachment(a)
Copy the code
SocketChannel
SocketChannel indicates a socket channel.
A SocketChannel instance is created through its static open() method
public static SocketChannel open(a) throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
public static SocketChannel open(SocketAddress remote)
throws IOException
{
// 1. ceate socket channel
SocketChannel sc = open();
try {
// 2. connect channel's socket, blocking until connected or error
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
Copy the code
While the open() method merely creates a SocketChannel object, open(SocketAddress Remote) goes a step further by calling connect(ADDR) to connect to the server.
SocketChannel is a subclass of SelectableChannel. Remember the earlier feature of SelectableChannel? If the blocking mode is not configured, then the SocketChannel object is blocked by default, and the open(SocketAddress Remote) method is blocked to open the server connection. And any I/O operation on a SocketChannel is blocking.
So since SelectableChannel can be used in non-blocking mode without blocking any I/O operation, we can first call the open() method with no parameters, then configure it to non-blocking mode, and then make a connection, which is a non-blocking connection. The pseudo-code is as follows
// Create a SocketChannel instance
SocketChannel sc = SocketChannel.open();
// Adjust to non-blocking mode
sr.configureBlocking(false);
// Connect to the server
sr.connect(remoteAddr);
Copy the code
While the connect() method is non-blocking, we can use the isConnectionPending() method to check whether the connection is still pending. If the connection is pending, we can do something else instead of blocking until the connection is established. Here we can see the benefits of using NIO.
If isConnectionPending() returns false, the connection has been established, but we also need to call finishConnect() to complete the connection.
Implement the client with SocketChannel
public class NonBlockingTCPClient {
public static void main(String[] args) {
byte[] data = "hello".getBytes();
SocketChannel channel = null;
try {
// 1. open a socket channel
channel = SocketChannel.open();
// adjust to be nonblocking
channel.configureBlocking(false);
// 2. init connection to server and repeatedly poll with complete
// connect() and finishConnect() are nonblocking operation, both return immediately
if(! channel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8899))) {
while(! channel.finishConnect()) { System.out.print(".");
}
}
System.out.println("Connected to server...");
ByteBuffer writeBuffer = ByteBuffer.wrap(data);
ByteBuffer readBuffer = ByteBuffer.allocate(data.length);
int totalBytesReceived = 0;
int bytesReceived;
// 3. read and write bytes
while (totalBytesReceived < data.length) {
if (writeBuffer.hasRemaining()) {
channel.write(writeBuffer);
}
if ((bytesReceived = channel.read(readBuffer)) == -1) {
throw new SocketException("Connection closed prematurely");
}
totalBytesReceived += bytesReceived;
System.out.print(".");
}
System.out.println("Server said: " + new String(readBuffer.array()));
} catch (IOException e) {
e.printStackTrace();
} finally {
// 4 .close socket channel
try {
if(channel ! =null) { channel.close(); }}catch(IOException e) { e.printStackTrace(); }}}}Copy the code
First, create an instance of SocketChannel and configure it to be in non-blocking mode. Only in non-blocking mode will any I/O operation on the instance of SocketChannel be non-blocking. This makes our client a non-blocking client and improves client performance.
The second step is to connect to the server using the connect() method while using the while loop to continuously detect and fully connect. We don’t have to wait so long, but this is just to show you how to connect. You must use finishConnect() to complete the connection process before you need to perform I/O immediately.
The third step is to read and write bytes using a ByteBuffer. Why do we keep reading and writing with a while loop? Remember when we talked about SelectableChannel being non-blocking? If a SelectableChannel is in non-blocking mode, its I/O operations may read and write fewer bytes than they actually are, or none at all. So what we’re going to do here is we’re going to loop in and out, and we’re going to make sure that the read and write is complete.
Socketchannel.write () : A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket’s output buffer.
ServerSocketChannel
The ServerSocketChannel class represents a server-socket channel.
ServerSocketChannel, like SocktChannel, needs to be created using the static open() method. Once created, it needs to be bound to the local IP address and port using the bind() method
ServerSocketChannel bind(SocketAddress local)
ServerSocketChannel bind(SocketAddress local, int limitQueue)
Copy the code
The SocketAddress local parameter represents the local IP address and port number, and the int limitQueue parameter limits the number of connections.
Selector
A Selector is a multiplexer of a SelectableChannel. Multiple SelectableChannel can be managed with a single Selector. For example, we can use selectors to manage multiple ServerSocketchannels in one thread, so we can listen for multiple port requests in a single thread, which is beautiful. Here, too, we can see the benefits of using NIO.
Creating a Selector Instance
The Selector instance also needs to be created using the static open() method.
Registered SelectableChannel
As I said before, we need to register the Selector by calling register() of the SelectableChannel, which will return a SelctionKey to represent the registration.
Select the channel
As mentioned earlier, you can manage multiple SelectableChannel by Selector. Its select() method monitors which channels are ready for I/O, and the return value represents the number of these I/ OS.
int select(a)
int select(long timeout)
int selectNow(a)
Copy the code
When the select() method is called, it stores the selectionkeys representing the channels that are ready for I/O operations in a collection that can be returned by selectedKeys().
Set<SelectionKey> selectedKeys(a)
Copy the code
The three methods of select() are named differently. The first method blocks the call, the third method sets a timeout, and the third method returns immediately.
wakeUp()
If calling selcet() causes the thread to block, or even block indefinitely, the wakeUp() method wakes up threads that are blocked by calling select().
Implement the server using selectors and ServerSocketChannel
package com.ckt.sockettest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TCPChannelServer {
public static void main(String[] args) {
Selector selector = null;
try {
// 1. open a selector
selector = Selector.open();
// 2. listen for server socket channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// must to be nonblocking mode before register
ssc.configureBlocking(false);
// bind server socket channel to port 8899
ssc.bind(new InetSocketAddress(8899));
// 3. register it with selector
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) { // run forever
// 4. select ready SelectionKey for I/O operation
if (selector.select(3000) = =0) {
continue;
}
// 5. get selected keys
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 6. handle selected key's interest operations
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// get socket channel from server socket channel
SocketChannel clientChannel = serverSocketChannel.accept();
// must to be nonblocking before register with selector
clientChannel.configureBlocking(false);
// register socket channel to selector with OP_READ
clientChannel.register(key.selector(), SelectionKey.OP_READ);
}
if (key.isReadable()) {
// read bytes from socket channel to byte buffer
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(10);
int readBytes = clientChannel.read(readBuffer);
if (readBytes == -1) {
System.out.println("closed.......");
clientChannel.close();
} else if (readBytes > 0) {
String s = new String(readBuffer.array());
System.out.println("Client said: " + s);
if (s.trim().equals("Hello")) {
// attachment is content used to write
key.interestOps(SelectionKey.OP_WRITE);
key.attach("Welcome!!!"); }}}if (key.isValid() && key.isWritable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
// get content from attachment
String content = (String) key.attachment();
// write content to socket channel
clientChannel.write(ByteBuffer.wrap(content.getBytes()));
key.interestOps(SelectionKey.OP_READ);
}
// remove handled key from selected keysiterator.remove(); }}}catch (IOException e) {
e.printStackTrace();
} finally {
// close selector
if(selector ! =null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Copy the code
The first step is to create the Selector instance.
Step 2: Create an instance of ServerSocketChannel, set it to non-blocking mode, and bind it to the local port.
Third, register the ServerSocketChannel instance with the Selector instance.
Step 4: Select some channels that are ready for I/O operations, with a 3-second timeout, which means blocking for 3 seconds.
Fifth, get the collection of selected selectionKeys.
Step 6, handle the operation of interest on the SelectionKey. The serverSocketChannel that is registered with the Selector must only be isAcceptable(), so that we can obtain the request SocketChannel instance from the client through its Accept () method. Then register the socketChannel with a selector and make it readable. So the next time you walk through the selectionKeys, you can handle that readable operation.
conclusion
In three articles, the outline of Java Sockets has been outlined. However, I have not been exposed to this content in my actual work, so these three articles are only a superficial introduction. If I have the opportunity to further study in the future, I will improve the content of these articles.