Epoll is a high-performance, scalable I/O event notification mechanism of the Linux kernel.

Epoll was first introduced in Linux2.5.44. It is designed to replace the existing select and poll system functions, so that the program that needs to operate a lot of file descriptors can play a better performance (Wikipedia Example: The old system function takes O(n) time, and epoll takes O(log n) time). The ePoll implementation is similar to poll in that it listens for events on multiple file descriptors.

In an operating system, virtual memory is usually divided into user space and the core space. The epoll base is constructed from configurable OS kernel objects, which are represented in user space in the form of file descriptors. This is part of the memory protection mechanism. The kernel, kernel Extensions, and drivers run on the core space. Other applications, on the other hand, run in user space. All applications running in user space are collectively referred to as userland.

A little bit more about the kernel

It is a used for sending the data I/O management software, a program and the data to the CPU and other electronic components processing computer, but directly on the hardware operation is very complex, usually the kernel provides a hardware abstraction method to complete (determined by the kernel when a program for a certain part of the hardware operation how long), Interprocess communication and system calls are accomplished through these methods.

Macro kernel:

In simple terms, a macro kernel defines a high-level abstract interface called a System call to implement the functions of the operating System, such as process management, file System, and storage management, which are performed by multiple programs running in kernel mode.

Micro kernel:

The microkernel structure is composed of hardware abstraction layer and system call. It includes the parts necessary to create a system; Such as thread management, address space and interprocess communication. The goal of microkernels is to separate the implementation of system services from the basic operating rules of the system.

Linux is the macro kernel used. Because it can bring modules into execution at run time, it makes it easier to extend the functionality of the kernel.

What did Epoll do?

Epoll searches for monitored file descriptors by using rb-tree.

When an event is registered on an EPoll instance, ePoll adds the event to the red-black tree of the EPoll instance and registers a callback function that adds the event to the ready list when it occurs.

The structure of epoll?

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
Copy the code

1) epoll_create

Create an epoll handle that tells the kernel how many listeners there are. This argument, unlike the first argument in select(), gives the maximum value of fd+1 to listen on. In the original implementation, the caller used the size parameter to tell the kernel how many file descriptors to listen on. If the number of monitored file descriptors exceeds size, the kernel automatically expands the capacity. Size no longer has this semantics, but the caller must still call with size greater than 0 to ensure backward compatibility. Note that when the epoll handle is created, it takes a fd value, which is visible on Linux if you look at /proc/id/fd/.

(2) * * * * epoll_ctl

Add, modify, or delete listening for event event on the EPFD to the kernel epoll instance corresponding to the EPFD. The op can be EPOLL_CTL_ADD, EPOLL_CTL_MOD, and EPOLL_CTL_DEL, respectively, which corresponds to adding new events, changing the event type to be listened on the file descriptor, and deleting an event from the instance. If the Events attribute of an event has an EPOLLET flag set, then the event will be listened for by edge firing.

Events can be a collection of the following macros:

  • EPOLLIN: Fires this event to indicate that there is readable data on the corresponding file descriptor. (If the peer SOCKET is closed normally);
  • EPOLLOUT: Fires this event to indicate that data can be written to the corresponding file descriptor.
  • EPOLLPRI: indicates that the corresponding file descriptor has urgent data to read (this should mean out-of-band data arriving);
  • EPOLLERR: indicates that the corresponding file descriptor has an error.
  • EPOLLHUP: indicates that the corresponding file descriptor is hung up.
  • EPOLLET: Set EPOLL to Edge Triggered mode, as opposed to Level Triggered.
  • EPOLLONESHOT: listens for only one event. After listening for this event, if you want to continue listening for this socket, you need to add this socket to the EPOLL queue again.

Such as:

struct epoll_event ev; // Set the file descriptor related to the event to be processed ev.data.fd=listenfd; / / set to deal with the event type ev. Events = EPOLLIN | EPOLLET; // Register the epoll event epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);Copy the code

(3) * * * * epoll_wait

Linux-2.6.19 also introduces epoll_WAIT to mask a specified signal: epoll_pwait

Receives I/O events of interest to the user that occur on the listener descriptor. Simply put: through a loop, constantly listen for exposed ports to see which fd is readable and writable

When timeout is 0, epoll_WAIT always returns immediately. When timeout is -1, epoll_WAIT blocks until any registered event is ready. When timeout is a positive integer, epoll blocks until the timer ends or the registered event becomes ready. Because of kernel scheduling delays, blocking can take a little longer than timeout (in milliseconds).

When the epoll file descriptor is used up, it is directly closed with close and is automatically removed from the listened file descriptor set

Epoll of actual combat

Said so many principles, the skull is afraid of buzzing, to see the actual combat sober ~

As you can see, epoll_ctl needs to be called every time you add/modify/remove a listened file descriptor, so keep it to a minimum to prevent the overhead from canceling out the benefits. Sometimes an application may have a large number of short connections (such as a Web server), and epoll_ctl will be called frequently and may become a bottleneck in the system.

Traditional select and poll efficiency tends to decrease quadratic and even cubic as the number of people online increases linearly, which puts a significant limit on the number of people the web server can support. This is due to their limited file descriptors and the inefficiency of traversing all FDS.

The key ~

When you have a large set of sockets and only some of them are “active” at any given time due to network latency, select/poll will scan the entire set linearly with each call, resulting in a linear decline in efficiency. Epoll does not have this problem, it only operates on “active” sockets – this is because in the kernel implementation epoll is implemented according to the callback function on each FD. Therefore, only “active” sockets will actively call the callback function. Other idle sockets will not. In this regard, epoll implements a “pseudo” AIO because the driving force is in the OS kernel. In some benchmarks, epoll is no more efficient than select/poll if all sockets are basically active – such as in a high-speed LAN environment – and slightly less efficient if epoll_CTL is used too much. However, once idle Connections are used to simulate a WAN environment, epoll is much more efficient than SELECT /poll.

int epfd = epoll_create(POLL_SIZE); struct epoll_event ev; struct epoll_event *events = NULL; nfds = epoll_wait(epfd, events, 20, 500); { for (n = 0; n < nfds; ++n) {if (events[n].data.fd == listener) {// If (events[n].data.fd == listener) { client = accept(listener, (structsockaddr *)&local, &addrlen); if (client < 0) { perror("accept"); continue; } setnonblocking(client); / / put the new connection in non-blocking mode ev. Events = EPOLLIN | EPOLLET; // Add the new connection to EPOLL's listening queue. / / note that the parameter EPOLLIN | EPOLLET didn't set to write socket listening in, / / if there is one write operation, this time epoll won't return event, if you want to write / / listening, Should be EPOLLIN | EPOLLOUT | EPOLLET ev. The data. The fd = client; If (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) {if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) { EPOLL_CTL_MOD = EPOLL_CTL_MOD = EPOLL_CTL_MOD = EPOLL_CTL_MOD = EPOLL_CTL_MOD fprintf(stderr, "epollsetinsertionerror:fd=%d", client); return -1; }} else if(event[n].events & EPOLLIN) {// If the user is connected and the data is received, // then read int sockfd_r; if ((sockfd_r = event[n].data.fd) < 0) continue; read(sockfd_r, buffer, MAXSIZE); // Change the event to be handled on sockfd_r to EPOLLOUT ev.data.fd = sockfd_r; // Change the event to be handled on sockfd_r to EPOLLOUT ev.data.fd = sockfd_r; ev.events = EPOLLOUT | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)} else if(event[n].events &epollout) {int sockfd_w = events[n].data.fd; write(sockfd_w, buffer, sizeof(buffer)); // Change the event to be handled on sockfd_w to EPOLLIN ev.data.fd = sockfd_w; // Change the event to be handled on sockfd_w to EPOLLIN ev.data.fd = sockfd_w; ev.events = EPOLLIN | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev) } do_use_fd(events[n].data.fd); }}Copy the code

Briefly describe the process:

  • Listen for a new connection and process the new connection.
  • If it is a connected user and the data is received, modify the event to be handled on sockfd_r to EPOLLOUT (writable) after reading;
  • If any data is sent, after writing, change the event to be processed on sockfd_w to EPOLLIN (readable)

How does epoll get called in Java?

Basic knowledge:

File descriptors:

  • (See “Unix Network Programming”)
  • A file descriptor is an int that identifies a file on a Unix system. In Unix philosophy, everything is a file, so each resource (including regular files, directories, pipes, POSIX IPC, sockets) can be viewed as a file.

In the Java NIO world, the Selector is the central controller, the Buffer is the container that holds the data, and the Channel is the basic front, the communication bridge between local I/O devices and network I/O.

  • Network I/O devices:
  • DatagramChannel: reads and writes UDP communication data, corresponding to the DatagramSocket class
  • SocketChannel: reads and writes TCP communication data, corresponding to the Socket class
  • ServerSocketChannel: Listens for new TCP connections and creates an read-write SocketChannel corresponding to the ServerSocket class
  • Local I/O devices:
  • FileChannel: Reads or writes data from a local File. It does not support Selector control and corresponds to the File class

① Let’s start with the simplest ServerSocketChannel

ServerSocketChannel is a socket listener like ServerSocket. The main difference is that ServerSocketChannel can run in non-blocking mode.

// Create a ServerSocketChannel, Public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel(); }Copy the code

ServerSocketChannel is also created by the underlying operating system. Its implementation class is **ServerSocketChannelImpl****, ** let’s take a look at its constructor

ServerSocketChannelImpl(SelectorProvider var1) throws IOException { super(var1); // Create a file operator this.fd = net.serverSocket (true); // Get the file operator is index this.fdval = ioutil.fdval (this.fd); // get the file operator is index this.fdval = ioutil.fdval (this.fd); this.state = 0; }Copy the code

Creating a ServerSocketChannelImpl essentially creates a fd(file descriptor) in the underlying operating system. It establishes a channel for network communication, calls the socket’s bind() method, and calls the operating system to obtain the TCP connection via accept()

Public SocketChannel Accept () throws IOException {// Ignore some verification and irrelevant codes.... SocketChannelImpl var2 = null; /** * EOF = -1; /** * EOF = -1; * UNAVAILABLE = -2; * INTERRUPTED = -3; * UNSUPPORTED = -4; * THROWN = -5; * UNSUPPORTED_CASE = -6; */ int var3 = 0; Var4 = new FileDescriptor(); var4 = new FileDescriptor(); InetSocketAddress[] var5 = new InetSocketAddress[1]; InetSocketAddress[] var5 = new InetSocketAddress[1]; Try {// an interrupter is set, which closes the connection this.begin(); Do {var3 = this.accept(this.fd, var4, var5); } while(var3 == -3 && this.isOpen()); } the finally {/ / when the connection is closed and the accept failure or throw AsynchronousCloseException enclosing the end (var3 > 0); // To verify that the connection is available, assert iostatus.check (var3); } if (var3 < 1) { return null; } {// The default connection is blocking IOUtil. ConfigureBlocking (var4, true); Var2 = new SocketChannelImpl(this.provider(), var4, var5[0]); // Check whether the connection was successful. return var2; }} / / rely on the underlying operating system implementation method of accept0 private int the accept (FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException { return this.accept0(var1, var2, var3); }Copy the code

(2) a SocketChannel

Used to read and write TCP communication data, equivalent to the client

  1. Create a SocketChannel using the open method,
  2. Then the connect method is used to initiate the connection with the server, and some methods to judge the connection are also supported.
  3. Read and Write Support the most basic read and write operations

open

public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); } // State, increases monotonically private static final int ST_UNINITIALIZED = -1; private static final int ST_UNCONNECTED = 0; private static final int ST_PENDING = 1; private static final int ST_CONNECTED = 2; private static final int ST_KILLPENDING = 3; private static final int ST_KILLED = 4; private int state = ST_UNINITIALIZED; SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); This. Fd = net.socket (true); // Create net.socket (true); This.fdval = ioutil.fdval (fd); This. state = ST_UNCONNECTED; }Copy the code

Connect Establish a connection

Public Boolean connect(SocketAddress var1) throws IOException {Boolean var2 = false; // synchronized(this.readlock) {synchronized(this.writelock) {/**** The channel and the address * * * * / / / whether the channel open enclosing ensureOpenAndUnconnected (); InetSocketAddress var5 = Net.checkAddress(var1); SecurityManager var6 = System.getSecurityManager(); if (var6 ! = null) { var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort()); } boolean var10000; Synchronized (this.blockingLock()) {int var8 = 0; synchronized(this.blockingLock()); try { try { this.begin(); Synchronized (this.statelock) {if (!) synchronized(this.statelock) {if (! this.isOpen()) { boolean var10 = false; return var10; } the if (this. LocalAddress = = null) {/ / and closed after the remote connection is established connection NetHooks beforeTcpConnect (enclosing fd, var5. GetAddress (), var5.getPort()); } this.readerThread = NativeThread.current(); } do { InetAddress var9 = var5.getAddress(); if (var9.isAnyLocalAddress()) { var9 = InetAddress.getLocalHost(); Var8 = net.connect (this.fd, var9, var5.getPort()); } while(var8 == -3 && this.isOpen()); synchronized(this.stateLock) { this.remoteAddress = var5; if (var8 <= 0) { if (! this.isBlocking()) { this.state = 1; } else { assert false; } } else { this.state = 2; If (this.isopen ()) {this.localaddress = net.localaddress (this.fd); } var10000 = true; return var10000; } } } var10000 = false; return var10000; }}}Copy the code

Before setting up the binding address, we need to call nethooks.beforetcpbind, which converts the FD to SDP(Sockets Direct Protocol) socket. SDP requires a network adapter to support the InfiniBand high-speed network communication technology. Windows does not support this protocol.

Let’s take a look at nethooks.java under openJDK: SRC \solaris\classes\sun\net

   private static final Provider provider = new sun.net.sdp.SdpProvider();

    public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpBind(fdObj, address, port);
    }
    public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpConnect(fdObj, address, port);
    }
Copy the code

You can see that this is actually implBeforeTcpBind in the SdpProvider being called

@Override public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException { if (enabled) convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port); } // converts unbound TCP socket to a SDP socket if it matches the rules private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port) throws IOException { boolean matched = false; / / is mainly through rules first validator judgment into whether conform to, generally have PortRangeRule validator / / and then execute converts fd socket for (Rule Rule: rules) { if (rule.match(action, address, port)) { SdpSupport.convertSocket(fdObj); matched = true; break; } } } public static void convertSocket(FileDescriptor fd) throws IOException { ... Int fdVal = fdaccess.get (fd); convert0(fdVal); } // convert0 JNIEXPORT void JNICALL Java_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, Int fd) {// create the actual socket(AF_INET_SDP, SOCK_STREAM, 0); Int s = create(env); if (s >= 0) { socklen_t len; int arg, res; struct linger linger; /* copy socket options that are relevant to SDP */ len = sizeof(arg); If (getSockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, &len) == 0) SO_REUSEADDR, (char*)&arg, len); len = sizeof(arg); If (getSockopt (fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, &len) == 0) setsockopt(s, SOL_SOCKET, &len) == 0) SO_OOBINLINE, (char*)&arg, len); len = sizeof(linger); If (getSockopt (fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0) setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len); RESTARTABLE(dup2(s, fd), res); if (res < 0) JNU_ThrowIOExceptionWithLastError(env, "dup2"); RESTARTABLE(close(s), res); }}Copy the code

Read read

Public int read(ByteBuffer var1) throws IOException {synchronized(this.readLock) {this.begin(); synchronized(this.readLock); Synchronized (this.statelock) {synchronized(this.statelock) {synchronized(this.statelock) {synchronized(this.statelock) {synchronized(this.statelock) {synchronized(this.statelock) {synchronized(this.statelock) {synchronized(this.statelock) {synchronized(this.statelock) {synchronized Var3 = ioutil. read(this.fd, var1, -1l, nd); } while(var3 == -3 && this.isOpen()); // This method mainly returns 0 for the state UNAVAILABLE(originally -2), otherwise returns n var4 = iostatus.normalize (var3); var20 = false; break label367; } this.readerCleanup(); assert IOStatus.check(var3); } } } } static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1.isReadOnly()) { throw new IllegalArgumentException("Read-only buffer"); } else if (var1 instanceof DirectBuffer) { return readIntoNativeBuffer(var0, var1, var2, var4); } else {/ / temporary buffer size for buf remain (limit - the position), outside the heap memory, use ByteBuffer. AllocateDirect (size) allocation / / Notes: Here have a try after the distribution behind - finally block will release the part of the memory ByteBuffer var5 = Util. GetTemporaryDirectBuffer (var1. The remaining ()); int var7; Int var6 = readIntoNativeBuffer(var0, var5, var2, var4); int var6 = readIntoNativeBuffer(var0, var5, var2, var4); var5.flip(); If (var6 > 0) {var1.put(var5); } var7 = var6; } finally { Util.offerFirstTemporaryDirectBuffer(var5); } return var7; } } private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {init if (var2! Var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2); } else {// it calls the SocketDispatcher.read method -> FileDispatcherImp. read0 method var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7); } if (var9 > 0) { var1.position(var5 + var9); } return var9; } // OpenJDK: SRC \solaris\native\sun\nio\ch // FileDispatcherImp. c JNIEXPORT jint JNICALL Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len) { jint fd = fdval(env, fdo); Void *buf = (void *)jlong_to_ptr(address); // Call the underlying read method return convertReturnVal(env, read(fd, buf, len), JNI_TRUE); }Copy the code

Summarize the reading process

  1. Initialize a direct buffer. If the buffer itself is direct, do not initialize it
  2. Write to the direct buffer by calling the underlying read method
  3. Finally, the direct buffer is written to the incoming buffer

Write to write

Read the previous read, write the entire execution process is basically the same, specific details refer to the following

public int write(ByteBuffer var1) throws IOException { if (var1 == null) { throw new NullPointerException(); } else { synchronized(this.writeLock) { this.ensureWriteOpen(); this.begin(); synchronized(this.stateLock) { if (! this.isOpen()) { var5 = 0; var20 = false; break label310; } this.writerThread = NativeThread.current(); } do {// read the fd from IOUtil to buf Var3 = ioutil.write (this.fd, var1, -1l, nd); } while(var3 == -3 && this.isOpen()); var4 = IOStatus.normalize(var3); var20 = false; this.writerCleanup(); assert IOStatus.check(var3); return var4; } } } } static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1 instanceof DirectBuffer) { return writeFromNativeBuffer(var0, var1, var2, var4); } else { ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7); int var10; Try {pos = buf; pos = buf; pos = buf; Var8.put (var1); var8.flip(); var1.position(var5); // Call int var9 = writeFromNativeBuffer(var0, var8, var2, var4); if (var9 > 0) { var1.position(var5 + var9); } var10 = var9; } finally { Util.offerFirstTemporaryDirectBuffer(var8); } return var10; } } IOUtil.writeFromNativeBuffer(fd , buf , position , nd) { // ... Int written = 0; if (position ! Written = nd.pwrite(fd,(DirectBuffer)bb).address() + pos,rem, position); } else {/ / its call SocketDispatcher. Write method - > FileDispatcherImpl. Written = nd write0 method. The write (fd, ((DirectBuffer)bb).address() + pos, rem); } / /... } FileDispatcherImpl. Write0 {/ / call the underlying the write method to return convertReturnVal (env, the write (fd, buf, len), JNI_FALSE); }}Copy the code

To summarize the write process:

  1. If buf is a direct buffer, it starts writing directly. If buf is not a direct buffer, it needs to initialize a direct buffer with buF remain size
  2. Writes the contents of buf to the direct buffer and restores buf’s position
  3. Write to a channel by calling the underlying write method
  4. Update the position of the BUF, that is, after the content was read by the direct buffer

Be patient. Epoll is coming up soon

Now that you understand the basics, the next section will cover how Java uses epoll.

The Selector is briefly

The purpose of a Selector is to manage a set of multiplexed SelectableChannel objects in Java NIO and to recognize whether the channel is ready for such components as read and write events –Java Doc

The process of creating a Selector is as follows:

// 1. Create Selector Selector Selector = Selector. Open (); // 2. Register the Channel with the selector //....... The procedure for new channel.... / / the Notes: Channel to register on the Selector must be blocked, so it is not can use the Selector / / FileChannel, because FileChannel is blocking the channel configureBlocking (false); SelectionKey Key = channel.register(selector, selectionkey.op_read); / / can also use or operation | to combine multiple events, such as SelectionKey key = channel. The register (the selector, SelectionKey OP_READ | SelectionKey. OP_WRITE); // However, it is worth noting that A Channel can only be registered with a Selector once. If a Channel is registered with a Selector more than once, it is equivalent to updating the interest set of SelectionKey //.Copy the code

A Channel registered with a Selector represents a SelectionKey event. The types of SelectionKey include:

  • OP_READ: indicates readable events. Value is: 1 < < 0
  • OP_WRITE: writable events. Value is: 1 < < 2
  • OP_CONNECT: event for the client to connect to the server (TCP connection). Generally, the event is to create a SocketChannel client channel. Value is: 1 < < 3
  • OP_ACCEPT: the server receives the connection event from the client. Generally, the ServerSocketChannel server channel is created. Value is: 1 < < 4

A Selector internally maintains three sets of keys:

  1. Key set: all the keys registered with the current channel on the Selector; Can be obtained by calling keys()
  2. Select-key set: Events that the current channel is ready for; Can be obtained by calling selectedKeys()
  3. Cancelled -key: SelectionKey#cancel() is placed in the collection if the channel has not been unregistered; Cannot be called through an external method

The Selector class contains the following 10 methods:

  • Open (): Creates a Selector object
  • IsOpen (): is the open state, returns false if close() is called
  • Provider (): Gets the provider of the current Selector
  • Keys (): As described above, gets all the keys registered with the current channel on the Selector
  • SelectedKeys (): Gets the list of events that the current channel is ready for
  • SelectNow (): Gets whether an event is currently ready. This method returns the result immediately without blocking; If the return value >0, one or more exist
  • Select (long Timeout): blocking timeout method for selectNow, which returns only when an event is ready; Otherwise it will return if it runs out of time
  • Select (): Blocking method of selectNow that does not return until an event is ready
  • Wakeup (): When this method is called, the thread blocking at select() returns immediately; Even if no thread is currently blocking at select(), the next thread to execute select() will immediately return the result, equivalent to executing selectNow() once
  • Close (): Calling close() after a Selector is used closes the Selector and invalidates all instances of SelectionKey registered with that Selector. The channel itself is not closed.

About SelectionKey

When we talk about selectors we have to talk about SelectionKey, they’re closely related, they’re used together; As shown above, registering a Selector with a Channel returns a SelectionKey object that contains the following:

  • Interest set, the set of events in which the current Channel is interested, that is, the interes set that is set when the register method is called
  • ready set
  • channel
  • selector
  • Attached Object, optional attached object

①****interest set can be obtained and set by methods in SelectionKey class

// Return a list of currently interested events int interestSet = key.interestOps(); Boolean isInterestedInAccept = interestSet & selectionKey.op_Accept; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE; / / by interestOps (int ops) method modify event list key. The interestOps (interestSet | SelectionKey. OP_WRITE);Copy the code

② Ready Set Indicates the list of events that the current Channel is ready for

int readySet = key.readyOps(); // There are also four methods to determine whether different events are ready key.isreadable (); // Read whether the event is ready key.iswritable (); // Write event is ready key.isConnecTable (); // Whether the client connection event is ready key.isacceptable (); // The server connection event is readyCopy the code

We can use the SelectionKey to get the current channel and selector

// Return the Channel associated with the current event. The options for converting include: 'ServerSocketChannel' and 'SocketChannel' Channel Channel = key.channel(); // Return the Selector object associated with the current event. Selector Selector = key.selector();Copy the code

We can attach an object to the selectionKey, or directly at registration:

key.attach(theObject); Object attachedObj = key.attachment(); SelectionKey = channel. Register (selector, selectionKey.op_read, theObject);Copy the code

Great oaks grow from little acorns, and that’s enough for the basics. With that in mind, you can get your hands on some Nio demo or Netty demo. Now, the important thing in this section is ~epoll

Having mentioned OpenJDK many times before, the implementation of Seletor is definitely operating system specific, so let’s take a look at it.

Can see the implementation of the Selector is SelectorImpl, then SelectorImpl responsibilities entrusted to the specific platform, such as in the figure there EpollSelectorImpl, Windows is WindowsSelectorImpl, MacOSX is KQueueSelectorImpl

So we know that Selector.open() gives us an instance of Selector, so how do we do that?

// Selector. Java public static Selector open() throws IOException {// Find the provider and open the Selector return SelectorProvider.provider().openSelector(); } // java.nio.channels.spi.SelectorProvider public static SelectorProvider provider() { synchronized (lock) { if (provider ! = null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; / / here is the real way to open the Selector provider = sun. Nio. Ch. DefaultSelectorProvider. The create (); return provider; }}); }}Copy the code

In its, each operating system has a sun. Nio. Ch. DefaultSelectorProvider implementation, to srcsolaris \ classes \ sun \ nio \ DefaultSelectorProvider under ch, for example:

/** * Returns the default SelectorProvider. */ public static SelectorProvider create() {// Obtain the OS name. String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); / / by name to create different Selctor if (osname. Equals (" SunOS ")) return createProvider (" sun. Nio. Ch. DevPollSelectorProvider "); if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); return new sun.nio.ch.PollSelectorProvider(); }Copy the code

Open srcsolaris \ classes \ sun \ nio \ ch EPollSelectorProvider under Java

public class EPollSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } public Channel inheritedChannel() throws IOException { return InheritedChannel.getChannel(); }}Copy the code

The Linux platform gets the final Selector implementation: epollSelectorimp.java under srcSolaris \classes\sun\nio\ch

Take a look at the constructor it implements:

EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); // makePipe returns 2 file descriptors for the pipe, Long pipeFds = IOUtil. MakePipe (false); long pipeFds = IOUtil. MakePipe (false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; PollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); }Copy the code

\ SRC \ solaris \ native \ sun \ nio \ ch EPollArrayWrapper under c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    /*
     * epoll_create expects a size as a hint to the kernel about how to
     * dimension internal structures. We can't predict the size in advance.
     */
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}
Copy the code

① Epoll_create has already been covered, so I won’t repeat it here.

② ePoll Wait Waits for kernel I/O events

Calling selselector. Select (the number of keys returned, which may be zero) will eventually delegate to the doSelect method of each implementation. I don’t want to post too much detail here, but look at EpollSelectorImpl’s doSelect method

protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); //EPollArrayWrapper pollWrapper pollWrapper.poll(timeout); } finally {end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); If (pollwrapper.interrupted ()) {// Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } int poll(long timeout) throws IOException { updateRegistrations(); // This code is described below, which involves epoo_ctl // This epollWait is not familiar. updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated;Copy the code

Look at the EPollArrayWrapper. C

JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) { struct epoll_event *events = jlong_to_ptr(address); int res; If (timeout <= 0) {/* Indefinite wait or no wait */ // / RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res); } else { /* Bounded wait; bounded restarts */ res = iepoll(epfd, events, numfds, timeout); } if (res < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed"); } return res; }Copy the code

You can see that in Linux Selector.select() actually calls epoll_wait

(3) EPoll Control and openJDK encapsulation of event management

For registration in the JDK to IO events in the history of the Selector is to use a SelectionKey, represents the Channel events of interest, such as Read, Write, Connect, Accept.

Calls to **Selector.register()** will store the event in epollarrayWrapper.java member variables eventsLow and eventsHigh

// events for file descriptors with registration changes pending, indexed // by file descriptor and stored as bytes for efficiency reasons. For // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at // least) then the update is stored in a map. Private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; Private map <Integer,Byte> eventsHigh; private map <Integer,Byte> eventsHigh; /** * Sets the pending update events for the given file descriptor. This * method has no effect if the update events is already set to KILLED, * unless {@code force} is {@code true}. */ private void setUpdateEvents(int fd, byte events, If (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd]! = KILLED) || force) { eventsLow[fd] = events; } } else { Integer key = Integer.valueOf(fd); if (! isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } } } /** * Returns the pending update events for the given file descriptor. */ private byte getUpdateEvents(int fd) { if (fd < MAX_UPDATE_ARRAY_SIZE) { return eventsLow[fd]; } else { Byte result = eventsHigh.get(Integer.valueOf(fd)); // result should never be null return result.byteValue(); }Copy the code

Is involved in the poll code above

int poll(long timeout) throws IOException { updateRegistrations(); / /** * Update the pending registrations. */ private void updateRegistrations() { synchronized (updateLock) { int j = 0;  while (j < updateCount) { int fd = updateDescriptors[j]; Short events = getUpdateEvents(fd); short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events ! = KILLED) {if (isRegistered) {// Determine the operation type to send epoll_ctl // No EPOLLET event type opcode = (Events! = 0)? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { opcode = (events ! = 0)? EPOLL_CTL_ADD : 0; } if (opcode ! = 0) {epoll_ctl (epfd, opcode, fd, events); if (opcode == EPOLL_CTL_ADD) { registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; } private native void epollCtl(int epfd, int opcode, int fd, int events);Copy the code

As you can see the native method that epollCtl calls, we enter epollarrayWrapper.c

JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) { struct epoll_event event; int res; event.events = events; event.data.fd = fd; RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res); /* * A channel may be registered with several Selectors. When each Selector * is polled a EPOLL_CTL_DEL op will be inserted into its pending update * list to remove the file descriptor from epoll. The "last" Selector will * close the file descriptor which automatically unregisters it from each * epoll descriptor. To avoid costly synchronization between  Selectors we * allow pending updates to be processed, ignoring errors. The errors are * harmless as the last update for the file descriptor is guaranteed to * be EPOLL_CTL_DEL. */ if (res < 0 && errno ! = EBADF && errno ! = ENOENT && errno ! = EPERM) { JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed"); }}Copy the code

After doSelect poll is executed, it updates the updateSelectedKeys in ePollSelecTorImp. Java, which is the set of three Selector sets that you can see in the previous section.

/** * Update the key that has been selected by epoll. * Add ready interest set to ready queue. */ private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { int nextFD = pollWrapper.getDescriptor(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski ! = null) { int rOps = pollWrapper.getEventOps(i); if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) ! = 0) { selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated; }Copy the code

conclusion

In this article, you should know the basics of channels, selectors, and how to use Epoll in Java. (including the more detailed transformation relationship between FDS and channels and sockets) With these basics in mind, the source code of NIO and Netty network frameworks may not be so difficult to read. I will follow up on Netty in the next article, as it has become the mainstream of distributed network communication frameworks!

Thank you

Zh.wikipedia.org/wiki/Epoll wikipedia

Baike.baidu.com/item/epoll/…

Juejin. Im/entry / 68449…

www.jianshu.com/p/f26f1eaa7…