preface

This series of articles will explain the evolution of NIO and pave the way for the reactor-Netty library.

About THE Java programming methodology — Reactor and Webflux video sharing, has completed Rxjava and Reactor, site B address is as follows:

Rxjava source code reading and sharing: www.bilibili.com/video/av345…

Reactor source code reading and sharing: www.bilibili.com/video/av353…

This series of source code interpretation based on JDK11 API details may differ from other versions, please solve JDK version issues.

Previous installments in this series:

BIO to NIO source code some things BIO

BIO to NIO source code for something on NIO

BIO to NIO source code for something in NIO

The introduction of SelectionKey

As we said earlier, once the student is identified, we set its state, and then we give it to the Selector to manage, and we set its state through the SelectionKey.

Let’s start with the register method under SelectableChannel, which we didn’t cover in detail in Channel. As we mentioned earlier, SelectableChannel makes channels that can be multiplexed by Selector. As a manager, a channel must register with the manager if it wants to reuse it. Therefore, the register method under SelectableChannel is the core of our second attention, which is also the entry point for our next content, the interpretation of the Register method, Take a look at our previous article BIO to NIO source code for some of the things that give channels the ability to be multiplexed on NIO.

The thing to remember here is that SelectableChannel is the key to channel characteristics. It’s kind of like table design. You could have put characteristics and things in a table, but in order to be more specific, in order to make the code function more manageable, So they took out and designed a second table, and this is kind of like a human organ, where they all work together to do one thing, but inside they focus on their own main specific function, and occasionally have some small functions of other organs.

Therefore, we can also know that the SelectionKey represents a SelectableChannel associated with a Selector, which can be simply understood as a token. It is just like a token that the foreground will get from the background when users log in to the permission management system. Users can access the corresponding resource information by virtue of this token.

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException
{...synchronized (regLock) {
       ...
        synchronized (keyLock) {
           ...
            SelectionKey k = findKey(sel);
            if(k ! =null) {
                k.attach(att);
                k.interestOps(ops);
            } else {
                // New registration
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
            returnk; }}}Copy the code

The combination creates and returns a SelectionKey each time a Selector registers a channel using the Register method.

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if(! (chinstanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) = =null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
Copy the code

We saw in the BIO to NIO source code that gives a Channel the ability to be multiplexed on NIO, that once a Channel is registered with a Selector, it stays registered until it’s unregistered. When unregistered, all resources assigned to a Channel by Selector are unregistered. That is, the SelectionKey is valid until it calls the SelectionKey#channel method, or the channel represented by the key is closed, or the Selector associated with the key is closed. As we also learned in our analysis of the previous article, canceling a SelectionKey is not immediately removed from the Selector, it is added to the Selector’s cancelledKeys Set so that it can be removed during the next selection, We can through the Java nio. Channels. SelectionKey# isValid to judge whether a SelectionKey effectively.

The SelectionKey contains four sets of operations, each of which is represented by an Int. The lower bits of the Int value are used to indicate the optional types of operations supported by the channel.


   /** * Operation-set bit for read operations. */
   public static final int OP_READ = 1 << 0;

   /** * Operation-set bit for write operations. */
   public static final int OP_WRITE = 1 << 2;

   /** * Operation-set bit for socket-connect operations. */
   public static final int OP_CONNECT = 1 << 3;

   /** * Operation-set bit for socket-accept operations. */
   public static final int OP_ACCEPT = 1 << 4;
Copy the code

interestOps

InterestOps determines which selector classes will be tested for readiness during the next selection operation, and whether the operation event is of interest to the channel. InterestOps in SelectionKey created, initialized to register when the Selector ops value, this value can be through the sun. The nio. Ch. SelectionKeyImpl# interestOps (int) to change, We can see this clearly in the SelectorImpl# Register.

//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl
   extends AbstractSelectionKey
{
   private static final VarHandle INTERESTOPS =
           ConstantBootstraps.fieldVarHandle(
                   MethodHandles.lookup(),
                   "interestOps",
                   VarHandle.class,
                   SelectionKeyImpl.class, int.class);

   private final SelChImpl channel;
   private final SelectorImpl selector;

   private volatile int interestOps;
   private volatile int readyOps;

   // registered events in kernel, used by some Selector implementations
   private int registeredEvents;

   // index of key in pollfd array, used by some Selector implementations
   private intindex; SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) { channel = ch; selector = sel; }... }Copy the code

readyOps

ReadyOps represents an action event that has been detected by Selector that a channel is ready. When the SelectionKey is created (as shown in the source code above), the readyOps value is 0, which may be updated during the Selector select operation, but it is important to note that we cannot update it directly.

The readyOps of SelectionKey indicates that a channel is ready for some operation, but there is no guarantee that it will not block during an operation on the ready event type, that is, the thread on which the operation occurs may block. In most cases, readyOps is updated immediately after the SELECT operation is complete, when the readyOps value is most accurate, and may not be accurate if there are external events or IO operations on the channel. So, we see that it is volatile.

The SelectionKey defines all operation events, but the operation events supported by a specific channel depend on the specific channel. All optional channels (subclasses of SelectableChannel) can use the SelectableChannel#validOps method to determine whether an operation event is supported by a channel. Each subclass has an implementation of validOps that returns a number. Identifies only which operations a channel supports. Attempting to set or test an operation setting that is not supported by a channel will throw an associated runtime exception. Ops supported by different application scenarios are different, and the extracted parts are as follows:

//java.nio.channels.SocketChannel#validOps
public final int validOps(a) {
    / / 1 | | 4 8 1101
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE
            | SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps(a) {
    / / 16
    return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps(a) {
    / / 1 | 4
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE);
}
Copy the code

If we need to frequently associate some data specified in our program to the SelectionKey, for example, we use an object to represent the state of a high-level protocol. Object is used to inform the implementation protocol handler. So, SelectionKey supports attaching an object to the Attachment of SelectionKey via the attach method. Attachment can be done by Java. Nio. Channels. SelectionKey# attachment method for a visit. To cancel this object, use selectionKey.attach(null).

If the object is no longer used, the SelectionKey must be removed manually. If not, the SelectionKey will always exist. Because this is a strong reference, the garbage collector will not collect the object, otherwise it will be a memory leak.

SelectionKey is thread-safe when used concurrently by multiple threads. All we need to know is that the Selector select operation will always use the current interestOps value set at the beginning of the call to the operation.

The Selector to explore

So now that we’ve touched a little bit on Selector, it’s pretty clear what the role is, so let’s dig a little bit deeper into how Selector design works where we’ve touched on it.

Open method of Selector

SelectableChannel objects rely on SelectableChannel objects for multiplexing. We can call Java nio. Channels. Selector# open to create a selector that object:

//java.nio.channels.Selector#open
public static Selector open(a) throws IOException {
    return SelectorProvider.provider().openSelector();
}
Copy the code

About the SelectorProvider provider (), which USES according to the system’s default implementation, I here is the Windows system, then the default implementation for the sun. Nio. Ch. WindowsSelectorProvider, in this way, You can invoke a concrete implementation based on the corresponding system.

//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider(a) {
    synchronized (lock) {
        if(provider ! =null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<>() {
                public SelectorProvider run(a) {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        returnprovider; }}); }}//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {

/** * Prevent instantiation. */
private DefaultSelectorProvider(a) {}/** * Returns the default SelectorProvider. */
public static SelectorProvider create(a) {
    return newsun.nio.ch.WindowsSelectorProvider(); }}Copy the code

Based on the Windows, the selector will use the sun here. Nio. Ch. WindowsSelectorImpl do some core logic.

public class WindowsSelectorProvider extends SelectorProviderImpl {

    public AbstractSelector openSelector(a) throws IOException {
        return new WindowsSelectorImpl(this); }}Copy the code

Here, we need to look at the constructor of WindowsSelectorImpl:

//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();

    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
Copy the code

We know from pipe.open () that the selector remains open until it calls its close method:

//java.nio.channels.spi.AbstractSelector#close
public final void close(a) throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if(! open)return;
    implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector(a) throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if(! selch.isOpen() && ! selch.isRegistered()) ((SelChImpl)selch).kill(); selectedKeys.remove(ski); i.remove(); }assertselectedKeys.isEmpty() && keys.isEmpty(); }}}//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose(a) throws IOException {
    assert! isOpen();assert Thread.holdsLock(this);

    // prevent further wakeup
    synchronized (interruptLock) {
        interruptTriggered = true;
    }

    wakeupPipe.sink().close();
    wakeupPipe.source().close();
    pollWrapper.free();

    // Make all remaining helper threads exit
    for (SelectThread t: threads)
            t.makeZombie();
    startLock.startThreads();
}

Copy the code

As you can see, the previous wakeupPipe is closed in the close method. WakeupPipe. Sink () and wakeupPipe. Source () are closed and pollwrapper.free () is released. Selectorimpl (SelectorProvider SP);

  • To create aPollArrayWrapperObject (pollWrapper);
  • Pipe.open()Open a pipe;
  • getwakeupSourceFdandwakeupSinkFdTwo file descriptors;
  • Add the file descriptor on the Source side of the PIPE (wakeupSourceFd) on thepollWrapper;

Light Pipe. The open ()

Here we get confused about why we created a pipe and what it was for.

Pipe.open()

//java.nio.channels.Pipe#open
public static Pipe open(a) throws IOException {
    return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe(a) throws IOException {
    return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
    try {
        AccessController.doPrivileged(new Initializer(sp));
    } catch (PrivilegedActionException x) {
        throw(IOException)x.getCause(); }}private class Initializer
implements PrivilegedExceptionAction<Void>
{

private final SelectorProvider sp;

private IOException ioe = null;

private Initializer(SelectorProvider sp) {
    this.sp = sp;
}

@Override
public Void run(a) throws IOException {
    LoopbackConnector connector = new LoopbackConnector();
    connector.run();
    if (ioe instanceof ClosedByInterruptException) {
        ioe = null;
        Thread connThread = new Thread(connector) {
            @Override
            public void interrupt(a) {}}; connThread.start();for (;;) {
            try {
                connThread.join();
                break;
            } catch (InterruptedException ex) {}
        }
        Thread.currentThread().interrupt();
    }

    if(ioe ! =null)
        throw new IOException("Unable to establish loopback connection", ioe);

    return null;
}
Copy the code

We can know from this source, created a PipeImpl object, in PipeImpl constructor executes the AccessController. DoPrivileged, then will perform, Initializer after it calls the run method:

//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {

    @Override
    public void run(a) {
        ServerSocketChannel ssc = null;
        SocketChannel sc1 = null;
        SocketChannel sc2 = null;

        try {
            // Create secret with a backing array.
            ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
            ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);

            // Loopback address
            InetAddress lb = InetAddress.getLoopbackAddress();
            assert(lb.isLoopbackAddress());
            InetSocketAddress sa = null;
            for(;;) {
                // Bind ServerSocketChannel to a port on the loopback
                // address
                if (ssc == null| |! ssc.isOpen()) { ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(lb, 0));
                    sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                }

                // Establish connection (assume connections are eagerly
                // accepted)
                sc1 = SocketChannel.open(sa);
                RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
                do {
                    sc1.write(secret);
                } while (secret.hasRemaining());
                secret.rewind();

                // Get a connection and verify it is legitimate
                sc2 = ssc.accept();
                do {
                    sc2.read(bb);
                } while (bb.hasRemaining());
                bb.rewind();

                if (bb.equals(secret))
                    break;

                sc2.close();
                sc1.close();
            }

            // Create source and sink channels
            source = new SourceChannelImpl(sp, sc1);
            sink = new SinkChannelImpl(sp, sc2);
        } catch (IOException e) {
            try {
                if(sc1 ! =null)
                    sc1.close();
                if(sc2 ! =null)
                    sc2.close();
            } catch (IOException e2) {}
            ioe = e;
        } finally {
            try {
                if(ssc ! =null)
                    ssc.close();
            } catch (IOException e2) {}
        }
    }
}
}
Copy the code

In Windows, create two local Socketchannels and then connect (write random data to check the connection between the two sockets). The two Socketchannels implement the source and sink ends of the pipe respectively. If you’re familiar with system call C/C++, you’ll know that a thread blocking on a SELECT can be woken up in one of three ways:

  1. Data is available to read/write, or an exception occurs.
  2. The blocking time is up, i.etime out.
  3. Receive anon-blockThe signal. Can be made ofkillorpthread_killSend out.

So, Selector. Wakeup () can only wakeup a blocked select using one of three methods:

  • The second option can be ruled out becauseselectOnce blocked, it cannot be modifiedtime outTime.
  • And the third seems to be only possibleLinuxOn the implementation,WindowsThere is no such signalling mechanism on.

Looks like there’s only one way. If we call Selector. Open () multiple times, then on Windows we will establish a TCP connection between ourselves and our loopback each time we call Selector. On Linux, a pair of pipes are opened on each call. By this point, we can guess that if we want to wake up the select, we just need to send data to our loopback connection. You can wake up the thread blocking on select.

To summarize, under Windows, the Java virtual machine establishes a LOOPback TCP connection with itself when select.open (); Under Linux, the Selector creates the pipe. Wakeup () makes it easy to wakeup a thread blocking on a select() system call by writing anything to the TCP connection or pipe it has set up.

PollArrayWrapper interpretation

In WindowsSelectorImpl constructor in the end, we see that a code: pollWrapper. AddWakeupSocket (wakeupSourceFd, 0); That is, put the file descriptor (wakeupSourceFd) on the Source side of the pipe into the pollWrapper. PollWrapper is an instance of PollArrayWrapper, and we’ll explore what it is in this section.

class PollArrayWrapper {

    private AllocatedNativeObject pollArray; // The fd array

    long pollArrayAddress; // pollArrayAddress

    @Native private static final short FD_OFFSET     = 0; // fd offset in pollfd
    @Native private static final short EVENT_OFFSET  = 4; // events offset in pollfd

    static short SIZE_POLLFD = 8; // sizeof pollfd struct

    private int size; // Size of the pollArray

    PollArrayWrapper(int newSize) {
        int allocationSize = newSize * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        this.size = newSize; }...// Access methods for fd structures
    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }

    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event); }...// Adds Windows wakeup socket at a given index.
    void addWakeupSocket(int fdVal, int index) { putDescriptor(index, fdVal); putEventOps(index, Net.POLLIN); }}Copy the code

The POLLIN event of wakeupSourceFd is identified as the corresponding value of pollArray’s EventOps, using the unsafe memory for its immediate operation. POLLIN is written to the value represented by net.pollin at SIZE_POLLFD * I + EVENT_OFFSET relative to the memory address where the pollArray resides, as shown in the local method source code below. The same thing with the putDescriptor. WakeupSourceFd, the file descriptor corresponding to source, is ready when data is written to sink.

//java.base/windows/native/libnio/ch/nio_util.h
    /* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined   */
    /* in Windows Vista / Windows Server 2008 and later. If we are on an      */
    /* older release we just use the Solaris constants as this was previously */
    /* done in PollArrayWrapper.java.                                         */
    #define POLLIN       0x0001
    #define POLLOUT      0x0004
    #define POLLERR      0x0008
    #define POLLHUP      0x0010
    #define POLLNVAL     0x0020
    #define POLLCONN     0x0002
Copy the code

The parent of the AllocatedNativeObject class has a number of unsafe class operations, all of which are directly memory-level operations. Unsafe.allocatememory (size + ps) pollArray is a chunk of system memory allocated via unsafe.allocatememory (size + ps).

class AllocatedNativeObject                             // package-private
    extends NativeObject
{
    /**
     * Allocates a memory area of at least {@code size} bytes outside of the
     * Java heap and creates a native object for that area.
     */
    AllocatedNativeObject(int size, boolean pageAligned) {
        super(size, pageAligned);
    }

    /** * Frees the native memory area associated with this object. */
    synchronized void free(a) {
        if(allocationAddress ! =0) {
            unsafe.freeMemory(allocationAddress);
            allocationAddress = 0; }}}//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
        if(! pageAligned) {this.allocationAddress = unsafe.allocateMemory(size);
            this.address = this.allocationAddress;
        } else {
            int ps = pageSize();
            long a = unsafe.allocateMemory(size + ps);
            this.allocationAddress = a;
            this.address = a + ps - (a & (ps - 1)); }}Copy the code

At this point, we are done with our reading of Selector.open(), whose main task is to complete the Pipe creation and put the wakeupSourceFd on the Pipe Source end into the pollArray, which is the hub for the Selector to complete its role. This article mainly focuses on the implementation of Windows Pipe, that is, in Windows through two connected Socketchannels to implement Pipe, Linux directly using the system Pipe.

SelectionKey management in selector

The SelectionKey is registered in the selector

The so-called registration is to place an object on a container field in the registered object, which can be an array, a queue, a set or a list. Here, it’s the same thing, except that it needs to have a return value, so you can just return the object that you want to put in the collection.

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if(! (chinstanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) = =null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
    ensureOpen();
    synchronized(updateLock) { newKeys.addLast(ski); }}Copy the code

We’ve seen this code before, so let’s go over it again. We’re going to create a new SelectionKeyImpl object, which is the wrapper around the Channel, and not only that, but we’re also going to take the current Selector object in, so we can get the corresponding Selector object from the SelectionKey object as well.

Next, implRegister, implemented on the Windows platform, first ensures that the Selector is turned on through ensureOpen(). This SelectionKeyImpl is then added to the WindowsSelectorImpl newKeys, an ArrayDeque object, that manages the newly registered SelectionKey. For those who do not understand ArrayDeque, you can refer to the Java container source code analysis of Deque and ArrayDeque.

SelectionKeyImpl is then added to sun.nio.ch.selectorImpl# keys. The Set

represents the Set of selectionKeys registered with the current Selector. Let’s look at the sun.nio.ch.selectorImpl constructor:

//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    keys = ConcurrentHashMap.newKeySet();
    selectedKeys = new HashSet<>();
    publicKeys = Collections.unmodifiableSet(keys);
    publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
Copy the code

If we want to know which keys are registered on the current Selector, we can call Sun.nio.ch.selectorImpll #keys:

//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys(a) {
    ensureOpen();
    return publicKeys;
}
Copy the code

Back in the constructor, selectedKeys, as the name suggests, belong to the selectedKeys, the SelectionKey corresponding to the Channel that was ready during the previous operation. This collection is a subset of keys. Get it from selector. SelectedKeys ().

//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys(a) {
    ensureOpen();
    return publicSelectedKeys;
}
Copy the code

We see that it returns publicSelectedKeys, which can be deleted, but not added, to elements in this field. In front of the content, we have involved SelectionKey cancelled, so we in the Java nio. Channels. The spi. AbstractSelector method, there is a definition of cancelledKeys, is also a HashSet object. This represents a SelectionKey that has been deregistered but not yet deregistered. This Set is not directly accessible; again, it is a subset of keys().

For a new Selector instance, all of the above collections are empty. As you can see from the source code shown above, the SelectionKey is added to the keys via channel.register, which is the source of the key. If a selectionkey.cancel () is called, the key will be added to the set of cancelledKeys, and then during the next call to the Selector Select method, the canceldKeys will not be empty. The Deregister operation on the SelectionKey (freeing the resource and removing it from the keys) is triggered. Either channel.close() or selectionkey.cancel () causes the selectionKey to be added to the cannceldKey.

Keys can be added to or removed from The cancelledKeys during each select operation.

An interpretation of the select method of Selector

With that in mind, let’s dive into the SELECT method and look at it in detail. Select (),selectNow(),select(long timeout); Select (Consumer

action, long timeout), select(Consumer

action) SelectNow (Consumer < SelectionKey > action). The latter is a new API in JDK11 that allows channels that are ready for I/O to perform a custom operation on the corresponding key during the select process.

Note that the select operation with the Consumer

action is blocked, and the Selector instance’s wakeup method is invoked only if at least one Channel is selected, or if the thread on which it is located is interrupted.

//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0)? -1 : timeout);
}

//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout)
    throws IOException
{
    Objects.requireNonNull(action);
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(action, (timeout == 0)? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    returndoSelect(action, timeout); }}finally {
                inSelect = false; }}}Copy the code

We can observe that either way, they all end up on the lockAndDoSelect method, which ultimately performs the doSelect(Action, timeout) implementation on the particular system. Here we use sun. Nio. Ch. WindowsSelectorImpl# doSelect as an example to tell its operation steps:

// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
    throws IOException
    {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();  / / < 1 >
        processDeregisterQueue(); / / < 2 >
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();  / / < 3 >
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
        finishLock.checkForException();
        processDeregisterQueue();  / / < 4 >
        int updated = updateSelectedKeys(action); / / < 5 >
        // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
        resetWakeupSocket(); / / < 6 >
        return updated;
    }
Copy the code

ProcessUpdateQueue interpretation

  1. First through the corresponding operating system implementation class (here is WindowsSelectorImpl) concrete implementation we can know, ProcessUpdateQueue () at <1> gets interestOps at this point for each remaining Channel (some of which are cancelled), including the newly registered and updateKeys, and pollWrapper management for them.

    • For newly registered SelectionKeyImpl, We have an offset from SIZE_POLLFD * totalChannels + FD_OFFSET and SIZE_POLLFD * totalChannels + relative to the memory address where pollArray resides EVENT_OFFSET is stored in the file descriptor FD of SelectionKeyImpl and its corresponding EventOps (initially 0).

    • For updateKeys, because it’s already stored relative to pollArray, we need to check the validity of the key we got, and if it’s valid, Simply write the interestOps of the SelectionKeyImpl object you are manipulating to the EventOps location that holds it in the pollWrapper.

    Note: If newKeys are valid, growIfNeeded() is called. Channelarray. length == totalChannels The initial capacity is 8. ChannelArray is an array that manages the number of selectionKeyimpls in the list. If it’s the same as totalChannels, it’s not just for channelArray expansion. More importantly, to assist pollWrapper, expanding pollWrapper is the goal.

    If totalChannels % MAX_SELECTABLE_FDS == 0, one more thread is opened to process the selector. On Windows, the SELECT system call has a maximum limit of 1024 file descriptors. If there are more than 1024 file descriptors, multi-thread polling is required. At the same time call pollWrapper. AddWakeupSocket (wakeupSourceFd, TotalChannels) writes the value of wakeupSourceFd at the memory address offset of the pollArray, SIZE_POLLFD * totalChannels + FD_OFFSET. The new thread can then call MAX_SELECTABLE_FDS to determine the wakeupSourceFd to monitor to wakeup the selector. Ski.setindex (totalChannels) records the index position of SelectionKeyImpl in the array for future use.

   /** * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue * Process new registrations and changes to the interest ops. */
private void processUpdateQueue(a) {
    assert Thread.holdsLock(this);

    synchronized (updateLock) {
        SelectionKeyImpl ski;

        // new registrations
        while((ski = newKeys.pollFirst()) ! =null) {
            if (ski.isValid()) {
                growIfNeeded();
                channelArray[totalChannels] = ski;
                ski.setIndex(totalChannels);
                pollWrapper.putEntry(totalChannels, ski);
                totalChannels++;
                MapEntry previous = fdMap.put(ski);
                assert previous == null; }}// changes to interest ops
        while((ski = updateKeys.pollFirst()) ! =null) {
            int events = ski.translateInterestOps();
            int fd = ski.getFDVal();
            if (ski.isValid() && fdMap.containsKey(fd)) {
                int index = ski.getIndex();
                assert index >= 0&& index < totalChannels; pollWrapper.putEventOps(index, events); }}}}//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
    putDescriptor(index, ski.getFDVal());
    putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded(a) {
    if (channelArray.length == totalChannels) {
        int newSize = totalChannels * 2; // Make a larger array
        SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
        System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
        channelArray = temp;
        pollWrapper.grow(newSize);
    }
    if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads neededpollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++; threadsCount++; }}// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;

// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array, where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;

//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
    PollArrayWrapper temp = new PollArrayWrapper(newSize);
    for (int i = 0; i < size; i++)
        replaceEntry(this, i, temp, i);
    pollArray.free();
    pollArray = temp.pollArray;
    this.size = temp.size;
    pollArrayAddress = pollArray.address();
}

// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer.MapEntry> {
    static final long serialVersionUID = 0L;
    private MapEntry get(int desc) {
        return get(Integer.valueOf(desc));
    }
    private MapEntry put(SelectionKeyImpl ski) {
        return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
    }
    private MapEntry remove(SelectionKeyImpl ski) {
        Integer fd = Integer.valueOf(ski.getFDVal());
        MapEntry x = get(fd);
        if((x ! =null) && (x.ski.channel() == ski.channel()))
            return remove(fd);
        return null; }}// class for fdMap entries
private static final class MapEntry {
    final SelectionKeyImpl ski;
    long updateCount = 0;
    MapEntry(SelectionKeyImpl ski) {
        this.ski = ski; }}private final FdMap fdMap = new FdMap();
Copy the code
ProcessDeregisterQueue interpretation
  1. Then through theWindowsselectorpl #doSelect <2>Place theprocessDeregisterQueue().
    • rightcancelledKeysClear, iteratecancelledKeysAnd for everykeyforderegisterOperation and then fromcancelledKeysDelete from the collectionkeysThe collection andselectedKeysTo release references for gc collection,
    • It callsimplDeregThe method will be fromchannelArrayTo remove the correspondingChannelOn behalf of theSelectionKeyImplAnd adjust thetotalChannelsAnd number of threads frommapandkeysRemove theSelectionKeyImplTo removeChannelOn theSelectionKeyImplAnd close theChannel.
    • It was also found thatprocessDeregisterQueue()Method is callingpollMethod is called before and after, which is to ensure that the call is handled correctlypollKeys cancelled during the time that the method is blocked can be cleaned up in time.
    • And then finally, we’re going to judge thiscancelledKeyRepresented by thechannelWhether to open and deregister. If so, the resources occupied by the corresponding file descriptor should be closed.
   /** * sun.nio.ch.SelectorImpl#processDeregisterQueue * Invoked by selection operations to process the cancelled-key set * /
protected final void processDeregisterQueue(a) throws IOException {
    assert Thread.holdsLock(this);
    assert Thread.holdsLock(publicSelectedKeys);

    Set<SelectionKey> cks = cancelledKeys();
    synchronized (cks) {
        if(! cks.isEmpty()) { Iterator<SelectionKey> i = cks.iterator();while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                i.remove();

                // remove the key from the selector
                implDereg(ski);

                selectedKeys.remove(ski);
                keys.remove(ski);

                // remove from channel's key set
                deregister(ski);

                SelectableChannel ch = ski.channel();
                if(! ch.isOpen() && ! ch.isRegistered()) ((SelChImpl)ch).kill(); }}}}//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
    assert! ski.isValid();assert Thread.holdsLock(this);

    if(fdMap.remove(ski) ! =null) {
        int i = ski.getIndex();
        assert (i >= 0);

        if(i ! = totalChannels -1) {
            // Copy end one over it
            SelectionKeyImpl endChannel = channelArray[totalChannels-1];
            channelArray[i] = endChannel;
            endChannel.setIndex(i);
            pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
        }
        ski.setIndex(-1);

        channelArray[totalChannels - 1] = null;
        totalChannels--;
        if(totalChannels ! =1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
            totalChannels--;
            threadsCount--; // The last thread has become redundant.}}}//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill(a) throws IOException {
    synchronized (stateLock) {
        if(state == ST_KILLPENDING) { state = ST_KILLED; nd.close(fd); }}}/ / C: / Program Files/Java/JDK - 11.0.1 / lib/SRC. Zip! /java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
    IOUtil.load();
    nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
    close0(fd);
}
Copy the code
AdjustThreadsCount interpretation
  1. And then we seeThe above WindowsSelectorImpl# doSelectShow the source codeadjustThreadsCount()Method call.
    • If was mentioned earliertotalChannels % MAX_SELECTABLE_FDS == 0, then open one more thread for processingselector. Here is the basisThe number of threads allocatedTo add or subtract threads, which is the maximum for the operating systemselectThe file descriptor for the operation limits the number of threads to be adjusted.
    • Let’s look at what the created thread does, which is observeSelectThreadtherunMethod implementation. By looking at the source code you can see that it is firstwhile (true)Through thestartLock.waitForStart(this)To control whether the thread is running or waitingsubSelector.poll(index)(More on this later),
    • When this threadpollEnd and if there are multiple threads relative to the current main threadSelectThreadThe child thread, the current threadSelectThreadThe thread terminates firstpollIf so, callfinishLock.threadFinished()To notify the main thread. Just create this thread and call itrunMethod of time, at this timelastRun = 0At the first startupsun.nio.ch.WindowsSelectorImpl.StartLock#runsCounterIs also 0, so it callsstartLock.wait()Then enter the waiting state.

Note:

  • sun.nio.ch.WindowsSelectorImpl.StartLockIt also determines whether the thread it is currently detecting is deprecated and returns if it is deprecatedtrue, so that the detected thread can exit its inner run methodwhileLoop to terminate the thread.
  • While adjusting the thread (calladjustThreadsCountMethod) andSelectorcallcloseMethod is called indirectly tosun.nio.ch.WindowsSelectorImpl#implCloseBoth methods are involvedSelectorThe release of the thread, that is, the callsun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie.
  • finishLock.threadFinished()Will be calledwakeup()Method to notify the main thread. Here, we can learn a detail if the thread is blocking inselectMethod, you can call itwakeupMethod will cause the blocking selection operation to return immediately, throughWindowsIn fact, the principle is topipethesinkThe end writes a byte,sourceThe file descriptor will be in a ready state,pollMethod will return, resulting inselectMethod returns. Other Solaris or Linux systems actually use system callspipeTo complete the creation of the pipeline, equivalent to directly using the system pipeline. throughwakeup()Related implementations can also be seen by callingwakeupWill be setinterruptTriggeredFlag bit, so multiple consecutive callswakeupThe effect is equivalent to a single call and does not cause any bugs.
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount(a) {
    if (threadsCount > threads.size()) {
        // More threads needed. Start more threads.
        for (int i = threads.size(); i < threadsCount; i++) {
            SelectThread newThread = new SelectThread(i);
            threads.add(newThread);
            newThread.setDaemon(true); newThread.start(); }}else if (threadsCount < threads.size()) {
        // Some threads become redundant. Remove them from the threads List.
        for (int i = threads.size() - 1; i >= threadsCount; i--) threads.remove(i).makeZombie(); }}//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
    private final int index; // index of this thread
    final SubSelector subSelector;
    private long lastRun = 0; // last run number
    private volatile boolean zombie;
    // Creates a new thread
    private SelectThread(int i) {
        super(null.null."SelectorHelper".0.false);
        this.index = i;
        this.subSelector = new SubSelector(i);
        //make sure we wait for next round of poll
        this.lastRun = startLock.runsCounter;
    }
    void makeZombie(a) {
        zombie = true;
    }
    boolean isZombie(a) {
        return zombie;
    }
    public void run(a) {
        while (true) { // poll loop
            // wait for the start of poll. If this thread has become
            // redundant, then exit.
            if (startLock.waitForStart(this))
                return;
            // call poll()
            try {
                subSelector.poll(index);
            } catch (IOException e) {
                // Save this exception and let other threads finish.
                finishLock.setException(e);
            }
            // notify main thread, that this thread has finished, and
            // wakeup others, if this thread is the first to finish.finishLock.threadFinished(); }}}// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished(a) {
    if (threadsToFinish == threads.size()) { // finished poll() first
        // if finished first, wakeup others
        wakeup();
    }
    threadsToFinish--;
    if (threadsToFinish == 0) // all helper threads finished poll().
        notify();             // notify the main thread
}

//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup(a) {
    synchronized (interruptLock) {
        if(! interruptTriggered) { setWakeupSocket(); interruptTriggered =true; }}return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket(a) {
    setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte.1.0);
}
Copy the code
Poll method interpretation of subSelector
  1. subSelector.poll()Is the core of SELECTnativefunctionpoll0Realize and putpollWrapper.pollArrayAddressPass as a parameterpoll0.readFds,writeFdsexceptFdsArrays are used to hold the bottom layerselectThe first position in the array is where the event occurssocketThe total number of events, and the remaining locations for eventssockethandlefd. We know from the following code: thispoll0()Will listenpollWrapperIn theFDThere is no data coming in and out here that will causeIOBlocks until a data read or write event occurs. Due to thepollWrapperThere are also preserved inServerSocketChanneltheFD, so as long asClientSocketSend a copy of the dataServerSocket, thenpoll0()It will return; And because of thepollWrapperThere are also preserved inpipethewritetheFD, so as long aspipethewriteTo the endFDSend a data, it will also causepoll0()Return; If neither of those things happens, thenpoll0()It’s going to keep blocking, which is thetaselector.select()It keeps blocking; If any of those things happen, thenselector.select()It’s going to return all of theSelectThreadtherun()To usewhile (true) {}In this way, we can make sure thatselectorContinue listening after data is received and processedpoll();

As you can see, NIO is still blocking IO, so how does it differ from BIO? The difference is where the block is located. BIO blocks in recvFROM and NIO blocks in SELECT. So what good does that do? If you simply change the location of the block, of course, nothing will change, but the clever implementation of Epoll and others is to use the callback mechanism, so that the listener only needs to know which socket data is ready, and only need to process the data on those threads. In the BIO, there are 1000 connections, 1000 threads need to be opened, and 1000 read positions are blocked. In the NIO, only 1 thread is needed. It uses the polling strategy of SELECT with the event mechanism of ePoll and the red-black tree data structure. It reduces the overhead of internal polling and greatly reduces the overhead of thread context switching.

//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
        private final int pollArrayIndex; // starting index in pollArray to poll
        // These arrays will hold result of native select().
        // The first element of each array is the number of selected sockets.
        // Other elements are file descriptors of selected sockets.
        // Save the FD where the read occurred
        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
        // Save the FD where the write occurred
        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
        // Save FD where except occurs
        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];

        private SubSelector(a) {
            this.pollArrayIndex = 0; // main thread
        }

        private SubSelector(int threadIndex) { // helper threads
            this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
        }

        private int poll(a) throws IOException{ // poll for the main thread
            returnpoll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout);  }private int poll(int index) throws IOException {
            // poll for helper threads
            return  poll0(pollWrapper.pollArrayAddress +
                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
                     Math.min(MAX_SELECTABLE_FDS,
                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
                     readFds, writeFds, exceptFds, timeout);
        }

        private native int poll0(long pollAddress, int numfds,
             int[] readFds, int[] writeFds, int[] exceptFds, long timeout); . }Copy the code
UpdateSelectedKeys interpretation
  1. I’m going to go throughWindowsSelectorImpl#doSelect <5>Place theupdateSelectedKeys(action)To deal with eachchannelthereadyThe information.
  • If the channel’skeyNot yet inselectedKeysIf it exists, it is added to the collection.
  • If the channel’skeyexistingselectedKeysPhi is thischannelThere is supportReadyOpsThe ready operation must contain one of these operations (provided by(ski.nioReadyOps() & ski.nioInterestOps()) ! = 0To determine), then modify itReadyOpsIs the current operation to be performed. And what we saw beforeConsumer<SelectionKey>This is also where the action takes place. As you can see from the following source, previously recorded inReadyOpsAny ready information in this callactionIt was thrown away before, and it went straight to setting.
//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
    updateCount++;
    int numKeysUpdated = 0;
    numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
    for (SelectThread t: threads) {
        numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
    }
    return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps,
                                        SelectionKeyImpl ski,
                                        Consumer<SelectionKey> action) {
    if(action ! =null) {
        ski.translateAndSetReadyOps(rOps);
        if((ski.nioReadyOps() & ski.nioInterestOps()) ! =0) {
            action.accept(ski);
            ensureOpen();
            return 1; }}else {
        assert Thread.holdsLock(publicSelectedKeys);
        if (selectedKeys.contains(ski)) {
            if (ski.translateAndUpdateReadyOps(rOps)) {
                return 1; }}else {
            ski.translateAndSetReadyOps(rOps);
            if((ski.nioReadyOps() & ski.nioInterestOps()) ! =0) {
                selectedKeys.add(ski);
                return 1; }}}return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
    int numKeysUpdated = 0;
    numKeysUpdated += processFDSet(updateCount, action, readFds,
                                    Net.POLLIN,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, writeFds,
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, exceptFds,
                                    Net.POLLIN |
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    true);
    return numKeysUpdated;
}

    /** * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet * updateCount is used to tell if a key has been counted as  updated * in this select operation. * * me.updateCount <= updateCount */
private int processFDSet(long updateCount,
                            Consumer<SelectionKey> action,
                            int[] fds, int rOps,
                            boolean isExceptFds)
{
    int numKeysUpdated = 0;
    for (int i = 1; i <= fds[0]; i++) {
        int desc = fds[i];
        if (desc == wakeupSourceFd) {
            synchronized (interruptLock) {
                interruptTriggered = true;
            }
            continue;
        }
        MapEntry me = fdMap.get(desc);
        // If me is null, the key was deregistered in the previous
        // processDeregisterQueue.
        if (me == null)
            continue;
        SelectionKeyImpl sk = me.ski;

        // The descriptor may be in the exceptfds set because there is
        // OOB data queued to the socket. If there is OOB data then it
        // is discarded and the key is not added to the selected set.
        if (isExceptFds &&
            (sk.channel() instanceof SocketChannelImpl) &&
            discardUrgentData(desc))
        {
            continue;
        }
        // We should be concerned
        int updated = processReadyEvents(rOps, sk, action);
        if (updated > 0 && me.updateCount != updateCount) {
            me.updateCount = updateCount;
            numKeysUpdated++;
        }
    }
    return numKeysUpdated;
}

Copy the code

That’s it for now, and I’ll look at Java NIO Buffers in the next article.