Linx’s network model, recV, Select, epoll, etc. In fact, the fundamental point is that the so-called BIO and NIO implemented on the server side are actually the encapsulation of Java for the processing of the operating system. Java also directly calls native methods and the operating system kernel through C language to realize the receiving and sending of requests.
BIO
The Socket encapsulation class in Java actually calls the recV method of the operating system directly to receive. Let’s take a look at the call chain from Java code to the Linux Socket Api. For detailed call chain information, see this article Java Socket and Linux Socket underlying call analysis
// ServerSocket server = new ServerSocket(8000); Socket client = server.accept(); InputStreamin = client.getInputStream();
byte[] bytes = new byte[1024];
int len = in.read(bytes);
String data = new String(bytes, 0 , len);
System.out.println("Receive client message:" + data);
OutputStream out = client.getOutputStream();
out.write("Hi".getBytes());
client.close();
Copy the code
In fact, the most important thing is the read logic. The Java socket calls the read method by inputStream, rede calls the socketRead method, and calls the native method socketRead(). In the native method, it actually calls the RECV method provided by the operating system kernel to read data. Compare the recV model in the previous article, and you will know that the Java reader thread, all the way to the kernel, will block here. The logic of RECV is that the nic will send an interrupt to the CPU until the data is received, and the RECV method will return for processing.
As for the Java implementation of BIO, it simply processes requests through an open thread. Here’s an example from the Definitive Guide to Netty.
public class TimeServerBio {
public static void main(String[] args) throws IOException {
int port = 9888;
ServerSocket serverSocket = new ServerSocket(port);
Socket socket = null;
while (true) {
socket = serverSocket.accept();
new Thread(new TimeHandler(socket)).start();
}
}
}
class TimeHandler implements Runnable{
private Socket socket;
public TimeHandler(Socket socket) {
this.socket = socket;
}
public void run() {
InputStreamReader is = null;
BufferedReader bufferedReader = null;
PrintWriter os = null;
String cinfo = null;
try {
is = new InputStreamReader(socket.getInputStream());
bufferedReader = new BufferedReader(is);
while((cinfo = bufferedReader.readLine()) ! = null) { System.out.println(cinfo); } socket.shutdownInput(); os = new PrintWriter(socket.getOutputStream()); os.write("Accepted!!!!!"); os.flush(); socket.shutdownOutput(); } catch (IOException e) { e.printStackTrace(); } finally { try { is.close(); os.close(); bufferedReader.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); }}}}Copy the code
As you can see, it is very simple to handle with New Tread () and receive data through a stream. The obvious disadvantage is that threads are constantly open. Too many threads cause context switching and do not support large amounts of concurrency.
Pseudo asynchronous I/o
Pseudo asynchronous IO is simply a thread pool, avoiding the infinite creation of threads. The code hasn’t changed much.
public class TimeServerExcutor {
public static void main(String[] args) throws IOException {
int port = 9888;
ServerSocket serverSocket = new ServerSocket(port);
Socket socket = null;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
while (true) { socket = serverSocket.accept(); threadPoolExecutor.execute(new TimeHandler(socket)); }}}Copy the code
In fact, there is no solution to the actual problem, if too many requests, in fact, will directly block in the blocking queue of the thread pool, resulting in a slow response message, and too large packets of the executing thread waiting for the completion of the transmission of the packet, will not execute the task in the message queue.
Java NIO
The NIO package of Java actually calls the multiplexing API of the operating system. For example, the multiplexing implementation in Linux is epoll. Because Java is cross-platform, JDK will choose different kernel APIS according to different operating systems, such as KQueue and so on.
The following analysis is referenced in this article Java NIO Analysis (8): High Concurrency core Selector details
Here’s the server-side code.
public class TimeServerNIO {
public static void main(String[] args) {
int port = 9888;
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "Nio-multipleTimeServer-001").start();
}
}
public class MultiplexerTimeServer implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
stop = false;
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while(! stop) { try { selector.select(1000); Set<SelectionKey> selectionKeySet = selector.selectedKeys();for(Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext();) { SelectionKey key = iterator.next(); this.handleInput(key); } } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println(body);
this.doWrite(socketChannel);
}
}
}
}
private void doWrite(SocketChannel socketChannel) throws IOException {
byte[] bytes = "accepted!!".getBytes("UTF-8"); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip(); socketChannel.write(byteBuffer); }}Copy the code
In the constructor of the MultipolexerTimeServer, we create a selector selector, and the main function of that selector selector is based on the different time types, To match to different channels. The constructor also creates a ServerSocketChannel, which only cares about accept, the socket connection event.
The selector open() method calls the operating system kernel’s epoll_create, the eventPoll object described in the previous article. The MultiplexerTimeServer’s run() method then calls selector. Select (), which calls the epoll_waite method.
Selector creation process
Let’s take a look at the details.
Java public static Selector open() throws IOException {// Find the provider first, and then open the Selectorreturn SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if(provider ! = null)return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
returnprovider; / / here is the real way to open the Selector provider = sun. Nio. Ch. DefaultSelectorProvider. The create ();returnprovider; }}); }}Copy the code
This calls the CREATE method in the JDK, which selects depending on the operating system.
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider createString osName = AccessController.doprivileged (new GetPropertyAction() {// Get the OS name."os.name")); // Create different Selctor by nameif (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
Copy the code
If you look at the Linux implementation, you can immediately see the use of epoll.
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
Copy the code
Take a look at the implementation of epoll in the JDK.
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); // makePipe returns two file descriptors for the pipe, Long pipeFds = ioutil.makepipe () long pipeFds = ioutil.makepipe ()false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; PollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); }Copy the code
Here we create an epollWrapper that encapsulates the operating system EventPoll and directly calls the native method and kernel API to create the EventPoll object.
EPollArrayWrapper() throws IOException {// creates epoll fd epfd = epollCreate(); // the epoll_event array passed to epoll_wait int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize,true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
Copy the code
At this point, the procedure has created the eventPoll object, which contains a ready queue implemented using a two-way list, a red-black tree to load all listening sockets, and a wait queue to block the thread that called the selector. Select method.
So here’s what happens when you call selector. Select ().
The selector to select process
The MultiplexerTimeServer class invokes the Selector. The select (), after will call all the way to ` ` EpollSelectorImpl. DoSelect ` () method.
protected int doSelect(long timeout) throws IOException {
if(closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); Pollwrapper.poll (timeout); pollwrapper.poll (timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); // All of the following are exceptionsif (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false; }}return numKeysUpdated;
}
Copy the code
You can see here that pollWrapper, which was previously wrapped in the EventPoll object, is called directly from the pollWrapper in the object. Look at this method.
int poll(long timeout) throws IOException { updateRegistrations(); // This epollWait is a bit familiar. updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break; }}return updated;
}
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;
Copy the code
When epoll_wait() is called, this method calls epoll_waite in the operating system kernel, blocks the current calling thread to wait for the socket to arrive, and references the socket to the ready queue when data is available. It then wakes up the blocked thread and returns to the epoll_wait() method.
Before the poll method is called, the socket that event_poll listens on is added, that is, the updateRegistrations method called before the poll () call, Because we registered a ServerSocketChannel (which encapsulates a socket), the updateRegistrations method puts the socket into the listening red-black tree via epoll_ctl. Listen for connection events.
/**
* Returns the pending update events for the given file descriptor.
*/
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while(j < updateCount) { int fd = updateDescriptors[j]; // Retrieve events from saved eventsLow and eventsHigh short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0;if(events ! = KILLED) {// Determine the operation type to epoll_ctl // The EPOLLET event type was not specifiedif (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if(opcode ! EpollCtl (epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
Copy the code
The selector contains three sets that hold the FD of the socket, which are Pointers that tell Java which sockets are being listened on, which sockets are readable and writable, and which sockets are to be cancelled.
// Public views of the key sets private Set<SelectionKey> Public keys; Private Set<SelectionKey> publicSelectedKeys; private Set<SelectionKey> publicSelectedKeys; // Removal allowed, Private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();Copy the code
After epollarrayWrapper. poll, the operating system returns from epoll_wait when a socket receives a read/write or connection. Invokes the poll method EpollArrayWrapper behind. UpdateSelectedKeys () method.
private int updateSelectedKeys() {// Get the number of ready queues int entries = pollWrapper. Updated; int numKeysUpdated = 0;for(int i=0; i<entries; I++) {/ / get ready queue the socket file descriptor int nextFD = pollWrapper. GetDescriptor (I); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is nullin the case// The file descriptor is compared to the registered channel, and the number of available selectionkeys is increased by one if the registered channel is concerned with the timeif(ski ! = null) { int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {
if(ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; }}else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if((ski.nioReadyOps() & ski.nioInterestOps()) ! = 0) { selectedKeys.add(ski); numKeysUpdated++; }}}}return numKeysUpdated;
}
Copy the code
Finally, the Selector maps the ready read-write socket to the registered channel and returns a list of keys corresponding to the available channel.
The first loop handles accept, registering a socketchannel for read events, and then waiting for readable data to arrive when the second loop calls selector. Select (). Handler is then called again to handle the processing of the read data, which is read from the channel.
Principle of the Channel
Let’s start with the problem of socket reading using streams.
- Create a socket FD with the socket() function to represent the communication endpoint
- Bind port, protocol stack, Socket type (TCP is streaming Socket)
- TCP connections are stored in a kernel queue, the length of which is specified by the familiar SO_BACKLOG
- Receive the link
- Communication, pleasant exchange of data
- Close links
We have solved the accept process with Epoll, which can listen on multiple sockets, but there is also the problem of exchanging data. In this case, the socket reads data through a stream, which is blocked.
If you need to read 10bytes of data, but only 1bytes is in the socket buffer, the calling thread will wait until 10bytes arrives or an exception occurs. This is blocked.
A socket can create an instance of a Socket in Java, but it is not generated by default.
socket()
1. Create a SocketChannel
/ / sun. Nio. Ch. SelectorProvider public SocketChannel openSocketChannel () throws IOException {/ / call SocketChannelImpl constructorreturnnew SocketChannelImpl(this); } // sun.nio.ch.SocketChannelImpl SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); // create socket fd this.fd = net.socket (true); This.fdval = ioutil.fdval (fd); // Get the socket fd value this.fdval = ioutil.fdval (fd); This. state = ST_UNCONNECTED; this.state = ST_UNCONNECTED; } // sun.nio.ch.Net static FileDescriptor socket(ProtocolFamily family, boolean stream) throws IOException { boolean preferIPv6 = isIPv6Available() && (family ! = StandardProtocolFamily.INET); // Socket0 is called lastreturn IOUtil.newFD(socket0(preferIPv6, stream, false));
}
// Due to oddities SO_REUSEADDR on windows reuse is ignored
private static native int socket0(boolean preferIPv6, boolean stream, boolean reuse);
Copy the code
As you can see, the native socket0 method was called to create the FD of the socket and return a file handle to the socket. The kernel is doing a lot of work here, judging a lot of things, creating and initializing read/write buffers and so on.
2. Read/write is not blocked
In fact, the read and write of a channel is based on the length of the ByteBuffer, which is read from the socket buffer and put into the ByteBuffer. For example, if the socket buffer contains 1byte and the buffer size is 10 bytes, the return value of reading 1byte is returned. If there are 20bytes in the buffer, 10bytes are read and returned without blocking.
Take a look at the SocketChannelImpl implementation in openJDK.
public int read(ByteBuffer buf) throws IOException { ... // n indicates the length of the data read int n = 0;for(;;) {// Read data from socket fd, length determined by buf n = ioutil. read(fd, buf, -1, nd);if ((n == IOStatus.INTERRUPTED) && isOpen()) {
// The system call was interrupted but the channel
// is still open, so retry
continue;
}
returnIOStatus.normalize(n); }... }Copy the code
IOUtil.read()
static int read(FileDescriptor fd, ByteBuffer dst, long position,
NativeDispatcher nd)
throws IOException
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer"); // DirectBuffer is known as an iceberg object, which may be associated with a heap of direct memoryif (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd); // Substitute a native buffer ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining()); try { int n =readIntoNativeBuffer(fd, bb, position, nd);
bb.flip();
if (n > 0)
dst.put(bb);
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, NativeDispatcher nd)
throws IOException
{
int pos = bb.position();
int lim = bb.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
if (rem == 0)
return0; int n = 0; // To read the socket fd, you must know the starting address // Interested can look at / / https://stackoverflow.com/questions/11981474/pread-and-lseek-not-working-on-socket-file-descriptor The DirectBuffer that calls bb has data in its direct memoryif(position ! = -1) { n = nd.pread(fd, ((DirectBuffer)bb).address() + pos, rem, position); }else {
n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
}
if (n > 0)
bb.position(pos + n);
return n;
}
static native int pread0(FileDescriptor fd, long address, int len,
long position) throws IOException;
Copy the code
And you can see here that after reading the data, it goes straight back. The reason for using directBuffer is because there are GC activities in the JVM, such as the tag cleaning GC algorithm, which changes the memory address of the object. The SocketAPI calls need to pass in a fixed memory address. If the object address changes after GC, socket reads and writes will crash.
Therefore, it can be seen that channel is just another encapsulation of the socket handle object of the operating system, while the Socket object of Java is another encapsulation of the socket handle of the operating system. Both of them decide whether reading and writing will block from the implementation mode. FCNTL = configureBlocking(false); FCNTL = configureBlocking(false); FCNTL = FCNTL; If the buffer size is larger or smaller, the buffer is returned without blocking.
//IOUtil.c
static int
configureBlocking(int fd, jboolean blocking)
{
int flags = fcntl(fd, F_GETFL);
int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}
Copy the code