From BIO to NIO - NIO source code interpretation 3(Selector)

1,132 阅读19分钟

Preface

The import of SelectionKey

As we mentioned in last articles, after students are ready, we need to set their status via SlectionKey, then they'll be handled by Selector.

In last article, we didn't talked too much for register in SelectableChannel, but we mentioned we can use SelectableChannel to make channel become multiplexing by Selector.

As an administer, if channel wanna implement multiplexing, we must tell administer to register our information. So register in SelectableChannel is the main point in the following. For details of register, just check previous article.

We need to know SelectableChannel implement Channel which is the key point for SelectionKey. This just likes table design. Briefly, we can put all properties into one table, but if we want to easily to write code and make our code more functionality, we need to separate the table into 2 tables. This way just like our body, each organ not only has their own different functions but also own the other organs' functions partially. When put them together, they can work well.

Thus, we can consider SelectionKey as a connection between SelectableChannel and Selector. Briefly, we can think it's a token, users can use this token to access the service.

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException
{       ...
    synchronized (regLock) {
       ...
        synchronized (keyLock) {
           ...
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.attach(att);
                k.interestOps(ops);
            } else {
                // New registration
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
            return k;
        }
    }
}
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}

From above 2 blocks of codes, when Selector use register to register Channel. it will create and return SelectionKey. We have mentioned once Channel was registered on Selector, it won't deregister until we do deregister and Selector will release all resources assigned to related Channel. Before Selector or Channel is closed, SelectionKey will be always valid. Also, we have talked about the validation of SelectionKey, when we want to cancel it, it will not be removed from Selector immediately. It'll be added into cancelledKey in Selector. Then it will deleted in next operations. We can check the validation by using java.nio.channels.SelectionKey#isValid.

SelectionKey has 4 operation-set. Each of them are represented by int value.

 	/**
     * Operation-set bit for read operations.
     */
    public static final int OP_READ = 1 << 0;

    /**
     * Operation-set bit for write operations.
     */
    public static final int OP_WRITE = 1 << 2;

    /**
     * Operation-set bit for socket-connect operations.
     */
    public static final int OP_CONNECT = 1 << 3;

    /**
     * Operation-set bit for socket-accept operations.
     */
    public static final int OP_ACCEPT = 1 << 4;

interestOps

Based on interestOps. we can know the next operation of Selector will be or operations listened by Channel. We can change ops value by sun.nio.ch.SelectionKeyImpl#interestOps(int) when SelectionKey is created. This we can see in the following:

 //sun.nio.ch.SelectionKeyImpl
 public final class SelectionKeyImpl
    extends AbstractSelectionKey
{
    private static final VarHandle INTERESTOPS =
            ConstantBootstraps.fieldVarHandle(
                    MethodHandles.lookup(),
                    "interestOps",
                    VarHandle.class,
                    SelectionKeyImpl.class, int.class);

    private final SelChImpl channel;
    private final SelectorImpl selector;

    private volatile int interestOps;
    private volatile int readyOps;

    // registered events in kernel, used by some Selector implementations
    private int registeredEvents;

    // index of key in pollfd array, used by some Selector implementations
    private int index;

    SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
        channel = ch;
        selector = sel;
    }
   ...
}

readyOps

readyOps means Selector has receive the signal of events have been ready from Channel. When SelectionKey is created, the value of readyOps is 0. The value may be updated when Selector do select. But we need to remember we can't invoke this api to update value directly.

readyOps in SelectionKey represents Channel has been ready for the future operations, but it can't promise there will be no blocking happens. After finishing select operation, readyOps will be updated in most times. Thus, the current value of readyOps must be correct. If external events have IO operations for the channel, readyOps may be not correct. Thus, it's volatile type.

SelectionKey has defined all operations. However, the supported operations will be dependent on supported channel. All selectable channel(subclass of SelectableChannel) can use SelectableChannel#validOps to check whether the operation can be supported by this Channel. So every subclass will have their own implementation for validOps and return a number to mark supported operations by Channel. When we try to test a unsupported operation for the Channel, it will throw Runtime Exception. Thus, the number of supported operations in different conditions are different:

//java.nio.channels.SocketChannel#validOps
public final int validOps() {
    //1|4|8  1101
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE
            | SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
    // 16
    return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
    // 1|4
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE);
}

If we often need to link some specific data to SelectionKey, we can use attach in SelectionKey to attach an Object on attachment of SelectionKey. attachment can be accessed by invoking java.nio.channels.SelectionKey#attachment. In another hand, if we want to cancel it, just selectionKey.attach(null).

If attached object won't be used any more, we need to clean it manually. If we don't do this, SlecletionKey will always exist. Because this reference is a strong reference, so GC won't clean it and this will lead to memory leaks.

When SelectionKey is used in concurrent, it's thread safe. We only need to know select in Selector will always use the value of interestOpsbefore select is applied.

The study of Selector

Now, we have know the role of Selector, and we will analyse how it works in the following sections.

open in Selector

From the name of SelectableChannel, we can know it can be used for multiplexing by Selector. We can invoke java.nio.channels.Selector#open to create selector:

//java.nio.channels.Selector#open
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

For SelectorProvider.provider(), it used default implementation chosen by System. If we use Windows, its default implementation will be sun.nio.ch.WindowsSelectorProvider. Then we can invoke implementation based on our own Operation System:

//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
   synchronized (lock) {
       if (provider != null)
           return provider;
       return AccessController.doPrivileged(
           new PrivilegedAction<>() {
               public SelectorProvider run() {
                       if (loadProviderFromProperty())
                           return provider;
                       if (loadProviderAsService())
                           return provider;
                       provider = sun.nio.ch.DefaultSelectorProvider.create();
                       return provider;
                   }
               });
   }
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {

/**

- Prevent instantiation.
*/
private DefaultSelectorProvider() { }

/**

- Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}

}

For Windows, selector will finally use sun.nio.ch.WindowsSelectorImpl to do some core business.

public class WindowsSelectorProvider extends SelectorProviderImpl {

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

Now we can check its constructor WindowsSelectorImpl:

//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();

    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

Selector will be closed until Pipe is close.

//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if (!open)
        return;
    implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
    assert !isOpen();
    assert Thread.holdsLock(this);

    // prevent further wakeup
    synchronized (interruptLock) {
        interruptTriggered = true;
    }

    wakeupPipe.sink().close();
    wakeupPipe.source().close();
    pollWrapper.free();

    // Make all remaining helper threads exit
    for (SelectThread t: threads)
            t.makeZombie();
    startLock.startThreads();
}

WakeipPipe has been closed by close. In close, it used wakeupPipe.sink(), close wakeupPipe.source and released by pollWrapper.free(). This is also the most difficult part in this article. Now we can follow it and see how it works.

At first, we can see WindowsSelectorImpl(SelectorProvider sp):

  1. It will create a PollArrayWrapper object which is pollWrapper
  2. Then, we will get a pipe by Pipe.open()
  3. After that, we get 2 file descriptor wakeipSourceFd and wakeupSinkFd.
  4. Finally, we put wakeupSourceFd into pollWrapper.

Pipe.open()

Now, there's a question why we need to create a pipe and what does it do?

Just check the implementation of Pipe.open():

//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
    return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
    return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
    try {
        AccessController.doPrivileged(new Initializer(sp));
    } catch (PrivilegedActionException x) {
        throw (IOException)x.getCause();
    }
}
private class Initializer
implements PrivilegedExceptionAction<Void>
{

private final SelectorProvider sp;

private IOException ioe = null;

private Initializer(SelectorProvider sp) {
    this.sp = sp;
}

@Override
public Void run() throws IOException {
    LoopbackConnector connector = new LoopbackConnector();
    connector.run();
    if (ioe instanceof ClosedByInterruptException) {
        ioe = null;
        Thread connThread = new Thread(connector) {
            @Override
            public void interrupt() {}
        };
        connThread.start();
        for (;;) {
            try {
                connThread.join();
                break;
            } catch (InterruptedException ex) {}
        }
        Thread.currentThread().interrupt();
    }

    if (ioe != null)
        throw new IOException("Unable to establish loopback connection", ioe);

    return null;
}

From the code, it created a PipeImpl object, in its constructor, it will execute AccessController.doPrivileged then use run() in Initializer.

//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {

    @Override
    public void run() {
        ServerSocketChannel ssc = null;
        SocketChannel sc1 = null;
        SocketChannel sc2 = null;

        try {
            // Create secret with a backing array.
            ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
            ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);

            // Loopback address
            InetAddress lb = InetAddress.getLoopbackAddress();
            assert(lb.isLoopbackAddress());
            InetSocketAddress sa = null;
            for(;;) {
                // Bind ServerSocketChannel to a port on the loopback
                // address
                if (ssc == null || !ssc.isOpen()) {
                    ssc = ServerSocketChannel.open();
                    ssc.socket().bind(new InetSocketAddress(lb, 0));
                    sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                }

                // Establish connection (assume connections are eagerly
                // accepted)
                sc1 = SocketChannel.open(sa);
                RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
                do {
                    sc1.write(secret);
                } while (secret.hasRemaining());
                secret.rewind();

                // Get a connection and verify it is legitimate
                sc2 = ssc.accept();
                do {
                    sc2.read(bb);
                } while (bb.hasRemaining());
                bb.rewind();

                if (bb.equals(secret))
                    break;

                sc2.close();
                sc1.close();
            }

            // Create source and sink channels
            source = new SourceChannelImpl(sp, sc1);
            sink = new SinkChannelImpl(sp, sc2);
        } catch (IOException e) {
            try {
                if (sc1 != null)
                    sc1.close();
                if (sc2 != null)
                    sc2.close();
            } catch (IOException e2) {}
            ioe = e;
        } finally {
            try {
                if (ssc != null)
                    ssc.close();
            } catch (IOException e2) {}
        }
    }
}
}

This is the process for creating pipe. For implementation in Windows, it will create 2 local socketChannel, then do connection, these 2 socketChannel implement the function of source and sink in pipe separately, but we still don't know what we can use pipe do.

If you guys are familiar with C/C++, we can know a thread which is blocked on select, it can be awaken by the following 3 ways:

  1. has data to do IO operations or exception happened.
  2. Timeout
  3. Receive a non-block signal from kill or pthread_kill.

So, if Selector.wakeup() want to wake up select in block, it only can use these 3 ways. In all of them:

  • We don't need to think the 2nd way, since if blocking happened in select, then timeout value can't be changed anymore.
  • The 3rd method only can be used on Linux, there's no any similar notifications on Windows

It seems we just can use the first method. If we invoke Selector.open() for many times on Windows, it will create a pair of TCP connection between its loopback and itself every time. However, for Linux, it will open a pair of pipe every time. Now, we know if we want to wake up select on Windows, we just need to give a little data for loopback, it will wake up the blocked thread on select.

So just do a simple conclusion. For Windows, JVM will build TCP connection. For Linux, Selector will build pipe. We can use Selector.wakeup() to wake the thread on select easily.

PollArratWrapper

At the end of construction of WindowsSelectorImpl, we saw pollWrapper.addWakeupSocket(wakeupSourceFd,0) which put the file descriptor for Source in pipe(wakeupSourceFd) to pollWrapper. Since pollWrapper is PollArrayWrapper's instance, so what is it? In this section, we will explore it.

class PollArrayWrapper {

    private AllocatedNativeObject pollArray; // The fd array

    long pollArrayAddress; // pollArrayAddress

    @Native private static final short FD_OFFSET     = 0; // fd offset in pollfd
    @Native private static final short EVENT_OFFSET  = 4; // events offset in pollfd

    static short SIZE_POLLFD = 8; // sizeof pollfd struct

    private int size; // Size of the pollArray

    PollArrayWrapper(int newSize) {
        int allocationSize = newSize * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        this.size = newSize;
    }

    ...

    // Access methods for fd structures
    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }

    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
    }
    ...
   // Adds Windows wakeup socket at a given index.
    void addWakeupSocket(int fdVal, int index) {
        putDescriptor(index, fdVal);
        putEventOps(index, Net.POLLIN);
    }
}

At the end of the code, it made POLLIN used as a value for EventOps in pollArray. It operated directly in memory within unsafe. It means it wrote the value of Net.POLLIN into offset SIZE_POLLFD * i + EVENT_OFFSET in the memory address of pollArray. putDescriptor also has similar operations. When sink get some data need to write, and File descriptor wakeupSourceFdrelated to source will be ready.

//java.base/windows/native/libnio/ch/nio_util.h
    /* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined   */
    /* in Windows Vista / Windows Server 2008 and later. If we are on an      */
    /* older release we just use the Solaris constants as this was previously */
    /* done in PollArrayWrapper.java.                                         */
    #define POLLIN       0x0001
    #define POLLOUT      0x0004
    #define POLLERR      0x0008
    #define POLLHUP      0x0010
    #define POLLNVAL     0x0020
    #define POLLCONN     0x0002

The parent class of AllicatedNativeObject has lots of unsafe operations, these operations are memory-level operations. From its parent class's constructor, we also can clearly see pollArray use unsafe.allocatedMemory(size+ps) to allocate memory.

class AllocatedNativeObject                             // package-private
    extends NativeObject
{
    /**
     * Allocates a memory area of at least {@code size} bytes outside of the
     * Java heap and creates a native object for that area.
     */
    AllocatedNativeObject(int size, boolean pageAligned) {
        super(size, pageAligned);
    }

    /**
     * Frees the native memory area associated with this object.
     */
    synchronized void free() {
        if (allocationAddress != 0) {
            unsafe.freeMemory(allocationAddress);
            allocationAddress = 0;
        }
    }

}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
        if (!pageAligned) {
            this.allocationAddress = unsafe.allocateMemory(size);
            this.address = this.allocationAddress;
        } else {
            int ps = pageSize();
            long a = unsafe.allocateMemory(size + ps);
            this.allocationAddress = a;
            this.address = a + ps - (a & (ps - 1));
        }
    }

Now, we have finished this part of Selector.open(). Its main task is to build pipe and put wakeupSource in pipe source to pollArray. pollArray plays an very important role for Selector to complete its work. This part focused on implementation in Windows mainly. For Windows, it implement pipe by using 2 connected socketChannel. For Linux, it only need to invoke pipe in System.

Management of SelectionKey in selector

Register SelectionKey on Selector

So What is registration? It means we put a object into a container field on the class we desired. This field can be a array, queue, set or list. The same is true here, but a little difference is it needs a return value. So we just need to put this into a collection and return as object.

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
    ensureOpen();
    synchronized (updateLock) {
        newKeys.addLast(ski);
    }
}

This we have seen before, just reviewed here.

Now it will build a SelectionKeyImpl object first, this object is encapsulation for Channel. Not only this, but also it absorbed Selector. Thus, we can get the Selector object by SelectionKey.

Then for implRegister based on Windows, it will check Selector is open by ensureOpen() firstly. Then it will add SelectionKeyImpl into WIndowsSelectorImpl to handler the new key in SelectionKey which just registered and added them into newKeys. newKeys is a ArrayDeque object.

After that, SelectionKeyImpl will be added into sun.nio.ch.SelectorImpl#keys. Set<SelectionKey> represented the set of SelectionKey which have been registered on the selector. Just check sun.nio.ch.SelectorImpl constructor:

//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    keys = ConcurrentHashMap.newKeySet();
    selectedKeys = new HashSet<>();
    publicKeys = Collections.unmodifiableSet(keys);
    publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}

publicKeys here coms from keys, but publicKey is read-only. So if we want to know registered keys on current Selector, we just need to use sun.nio.ch.SelectorImpl#keys:

//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
    ensureOpen();
    return publicKeys;
}

Back to this constructor, selectedKey which means the keys have been selected, It represented SelectionKey in Channel has been ready in previous operation. The set is subset of keys, it can be got from selector.selectedKeys().

//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
    ensureOpen();
    return publicSelectedKeys;
}

We can see it returned publicSelectedKeys, we can do delete operations for elements in this field, but we can't do any additions.

In previous section, we have talked about cancelation of SelectionKey. Thus, in java.nio.channels.spi.AbstractSelector, it has defined a hashset object cancelledKeys. It represents the SelectionKey which have been cancelled but haven't been deregistered. This set can't be access directly. It's also a subset belongs to keys().

For new instance of Selector, the above sets are empty. From the source code, we use channel.register to register SelectionKey and add them into keys.

If selectionKey.cancel() is invoked, then the key will be added into cancelledKeys, if cancelledKeys is not empty, it will trigger deregister of SelectionKey to release resources in next select operation of Selector. Whatever we use channel.close() or selectionKey.cancel(), SelectionKey will be added into cancelledKey. In every select operation, key can be added into selectedKey or removed from cancelledKey.

select in Selector

Now, we can focus on select. From api of Selector, select has 2 forms. The first ones are select(), selectNow() and select(long timeout). Another ones are

select(Consumer<SelectionKey> action, long timeout)select(Consumer<SelectionKey> action)selectNow(Consumer<SelectionKey> action). The second forms are new feature APIs in JDK11 and they are used for channels ready for IO operations to do a custom operation for selected key in select operation.

But we need to know, select in Consumer<SelectionKey> action is blocked. The thread can be waken up only we select at least channel. This is the same for interruption in thread.

//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout)
    throws IOException
{
    Objects.requireNonNull(action);
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }

We can see whatever select we choose, it will finally invoke lockAndDoSelect, then execute deSelect(action,timeout) and it has different implementations on different systems.

We can use sun.nio.ch.WindowsSelectorImpl#doSelect as example and explore its process:

// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
    throws IOException
    {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();  // <1>
        processDeregisterQueue(); // <2>
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();  // <3>
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();  // <4>
        int updated = updateSelectedKeys(action); // <5>
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket(); // <6>
        return updated;
    }

processUpdateQueue

  1. At first, from the specific system implementation class(WindowsSelectorImpl) we can know, processUpdateQueue can get the rest of Channel's current interestOps although some of them maybe cancelled. This contains new registered SelectionKey and updateKey, these keys will be handler by pollWrapper.
  • For new registered SelectionKeyImpl, we need to save file descriptor and related EventOps(initial value is 0) on the memory address of pollArray at SIZE_POLLFD * totalChannels + FD_OFFSETSIZE_POLLFD * totalChannels + EVENT_OFFSET separately.

  • For updateKeys, since it has been saved on the relative position of pollArray, we also need to check the key's validation. If it is valid, we just need to write SelectionImpl's interestOps which we are operating now to pollWrapper's EventOps.

After we do the check of validation for newKeys, if it's valid, it will invoke growIfNeeded(), it will firstly check channelArray.length == totalChannels, this is a array for SlectionKeyImpl and initial size is 8. It is convenient for Selector to handler registered SelectionKeyImpl by ChannelArray. By check its array length, if it is same as totalChannels(initial value is 1), it not only expands channelArray's size, but also help pollWrapper expand its size, which is the main purpose.

However, when totalChannels % MAX_SELECTABLE_FDS == 0, it will create a new thread to handler selector. There's a limit on the number of file descriptors which can be invoked on Windows and it only can invoke 1024 file descriptors at most. If we need to handler more than 1024 file descriptors, we need to do this in multi-thread. At the same time, we need to use pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels) to write fdVal of wakeupSourceFd on the memory address of pollArray's offset SIZE_POLLFD * totalChannels + FD_OFFSET. Then new thread which we create for helping handler extra file descriptors can use MAX_SELECTABLE_FDS to get listened wakeupSourceFd. It can help wake up Selector. By the use of ski.setIndex(totalChannels) to record the index position of SelectionKeyImpl for future use.

  /**
    * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue
    * Process new registrations and changes to the interest ops.
    */
private void processUpdateQueue() {
    assert Thread.holdsLock(this);

    synchronized (updateLock) {
        SelectionKeyImpl ski;

        // new registrations
        while ((ski = newKeys.pollFirst()) != null) {
            if (ski.isValid()) {
                growIfNeeded();
                channelArray[totalChannels] = ski;
                ski.setIndex(totalChannels);
                pollWrapper.putEntry(totalChannels, ski);
                totalChannels++;
                MapEntry previous = fdMap.put(ski);
                assert previous == null;
            }
        }

        // changes to interest ops
        while ((ski = updateKeys.pollFirst()) != null) {
            int events = ski.translateInterestOps();
            int fd = ski.getFDVal();
            if (ski.isValid() && fdMap.containsKey(fd)) {
                int index = ski.getIndex();
                assert index >= 0 && index < totalChannels;
                pollWrapper.putEventOps(index, events);
            }
        }
    }
}

//sun.nio.ch.PollArrayWrapper#putEntry
// Prepare another pollfd struct for use.
void putEntry(int index, SelectionKeyImpl ski) {
    putDescriptor(index, ski.getFDVal());
    putEventOps(index, 0);
}
//sun.nio.ch.WindowsSelectorImpl#growIfNeeded
private void growIfNeeded() {
    if (channelArray.length == totalChannels) {
        int newSize = totalChannels * 2; // Make a larger array
        SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
        System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
        channelArray = temp;
        pollWrapper.grow(newSize);
    }
    if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
        pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
        totalChannels++;
        threadsCount++;
    }
}
// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private static final int MAX_SELECTABLE_FDS = 1024;

// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array,  where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
// The number of valid entries in  poll array, including entries occupied
// by wakeup socket handle.
private int totalChannels = 1;

//sun.nio.ch.PollArrayWrapper#grow
// Grows the pollfd array to new size
void grow(int newSize) {
    PollArrayWrapper temp = new PollArrayWrapper(newSize);
    for (int i = 0; i < size; i++)
        replaceEntry(this, i, temp, i);
    pollArray.free();
    pollArray = temp.pollArray;
    this.size = temp.size;
    pollArrayAddress = pollArray.address();
}

// Maps file descriptors to their indices in  pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
    static final long serialVersionUID = 0L;
    private MapEntry get(int desc) {
        return get(Integer.valueOf(desc));
    }
    private MapEntry put(SelectionKeyImpl ski) {
        return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
    }
    private MapEntry remove(SelectionKeyImpl ski) {
        Integer fd = Integer.valueOf(ski.getFDVal());
        MapEntry x = get(fd);
        if ((x != null) && (x.ski.channel() == ski.channel()))
            return remove(fd);
        return null;
    }
}

// class for fdMap entries
private static final class MapEntry {
    final SelectionKeyImpl ski;
    long updateCount = 0;
    MapEntry(SelectionKeyImpl ski) {
        this.ski = ski;
    }
}
private final FdMap fdMap = new FdMap();

processDeregisterQueue

  1. Now we explore processDeregisterQueue at WindowsSelectorImpl#doSelect <2>.
  • It will traversal cancelledKeys at first, then do deregister for each key and remove from cancelledKeys. This will be also removed from keys and selectedkeys to release reference and help garbage collection.
  • It will invoke implDereg internally, and remove SelectionKeyImpl which are related to channel from channelArray. It will adjust totalChannels and the number of threads, then remore SelectionKeyImpl from map and keys. After removing SelectionKeyImpl from Channel, it will close Channel.
  • At the same time, processDeregisterQueue() should be invoked before and after invoking poll. This is to make sure, it can clean cancelled keys immediately when poll is in blocking.
  • Finally, it will check the status of Channel for cancelledKey whether it's open or deregistered. If it has been deregistered and closed, we should release the related file descriptor and close it.
   /**
    * sun.nio.ch.SelectorImpl#processDeregisterQueue
    * Invoked by selection operations to process the cancelled-key set
    */
protected final void processDeregisterQueue() throws IOException {
    assert Thread.holdsLock(this);
    assert Thread.holdsLock(publicSelectedKeys);

    Set<SelectionKey> cks = cancelledKeys();
    synchronized (cks) {
        if (!cks.isEmpty()) {
            Iterator<SelectionKey> i = cks.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                i.remove();

                // remove the key from the selector
                implDereg(ski);

                selectedKeys.remove(ski);
                keys.remove(ski);

                // remove from channel's key set
                deregister(ski);

                SelectableChannel ch = ski.channel();
                if (!ch.isOpen() && !ch.isRegistered())
                    ((SelChImpl)ch).kill();
            }
        }
    }
}
//sun.nio.ch.WindowsSelectorImpl#implDereg
@Override
protected void implDereg(SelectionKeyImpl ski) {
    assert !ski.isValid();
    assert Thread.holdsLock(this);

    if (fdMap.remove(ski) != null) {
        int i = ski.getIndex();
        assert (i >= 0);

        if (i != totalChannels - 1) {
            // Copy end one over it
            SelectionKeyImpl endChannel = channelArray[totalChannels-1];
            channelArray[i] = endChannel;
            endChannel.setIndex(i);
            pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
        }
        ski.setIndex(-1);

        channelArray[totalChannels - 1] = null;
        totalChannels--;
        if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
            totalChannels--;
            threadsCount--; // The last thread has become redundant.
        }
    }
}

//sun.nio.ch.SocketChannelImpl#kill
@Override
public void kill() throws IOException {
    synchronized (stateLock) {
        if (state == ST_KILLPENDING) {
            state = ST_KILLED;
            nd.close(fd);
        }
    }
}
//C:/Program Files/Java/jdk-11.0.1/lib/src.zip!/java.base/sun/nio/ch/SocketChannelImpl.java:1126
static {
    IOUtil.load();
    nd = new SocketDispatcher();
}
//sun.nio.ch.SocketDispatcher#close
void close(FileDescriptor fd) throws IOException {
    close0(fd);
}

adjustThreadsCount

  1. Now we just explore adjustThreadsCount.
  • Now we have mentioned if totalChannels % MAX_SELECTABLE_FDS == 0 is true, it will open a new thread to handler selector. This will adjust the number of threads by allocated counts of threads. It means it will adjust the number of threads based on the limitation of max number of file descriptors depend on different Systems.
  • Now we need to observe what new thread do which mean its run in SelectThread. How to know it, we need to know the condition is while(true), then startLock.waitForStart(this) will control this thread run or wait. If running, then invoke `subSelector.poll(index).
  • When poll finished, and there are several sub threads SelectThread related to main thread, and current SelectThread firstly finished poll, it will invoke finishLock.threadFinished() to notice main thread. When we invoke run in newly created thread, now lastRun = 0. At the first time startup, sun.nio.ch.WindowsSelectorImpl.StartLock#runsCounter is 0 at the same time, so it will invoke startLock.wait() to become awaiting.
  • sun.nio.ch.WindowsSelectorImpl.StartLock will check the current thread whether it has been discarded, if it is, then return true. Then the checked thread will jump out from run in while loop to finish the thread.
  • When invoking adjustThreadsCount and close in Selector, it will use sun.nio.ch.WindowsSelectorImpl#implClose, there are related to the release of Selector's threads which use sun.nio.ch.WindowsSelectorImpl.SelectThread#makeZombie.
  • finishLock.threadFinished() will use wakeup() to notice main thread. One detail we can learn from this. If thread is currently blocked on the select, then we can use wakeup to make blocked select return value for us. Based on the implementation on Windows, it just writes data on sink in pipe, source fd will become ready and poll will return which lead select also return. On Solaris or Linux, they take pipe to build connection by System. By the implementation of wakeup we also can know, it will set the position of interruptTriggered. If we invoke wakeup for many times, it's same as one time. It won't cause any bugs happen.
//sun.nio.ch.WindowsSelectorImpl#adjustThreadsCount
// After some channels registered/deregistered, the number of required
// helper threads may have changed. Adjust this number.
private void adjustThreadsCount() {
    if (threadsCount > threads.size()) {
        // More threads needed. Start more threads.
        for (int i = threads.size(); i < threadsCount; i++) {
            SelectThread newThread = new SelectThread(i);
            threads.add(newThread);
            newThread.setDaemon(true);
            newThread.start();
        }
    } else if (threadsCount < threads.size()) {
        // Some threads become redundant. Remove them from the threads List.
        for (int i = threads.size() - 1 ; i >= threadsCount; i--)
            threads.remove(i).makeZombie();
    }
}

//sun.nio.ch.WindowsSelectorImpl.SelectThread
// Represents a helper thread used for select.
private final class SelectThread extends Thread {
    private final int index; // index of this thread
    final SubSelector subSelector;
    private long lastRun = 0; // last run number
    private volatile boolean zombie;
    // Creates a new thread
    private SelectThread(int i) {
        super(null, null, "SelectorHelper", 0, false);
        this.index = i;
        this.subSelector = new SubSelector(i);
        //make sure we wait for next round of poll
        this.lastRun = startLock.runsCounter;
    }
    void makeZombie() {
        zombie = true;
    }
    boolean isZombie() {
        return zombie;
    }
    public void run() {
        while (true) { // poll loop
            // wait for the start of poll. If this thread has become
            // redundant, then exit.
            if (startLock.waitForStart(this))
                return;
            // call poll()
            try {
                subSelector.poll(index);
            } catch (IOException e) {
                // Save this exception and let other threads finish.
                finishLock.setException(e);
            }
            // notify main thread, that this thread has finished, and
            // wakeup others, if this thread is the first to finish.
            finishLock.threadFinished();
        }
    }
}

// sun.nio.ch.WindowsSelectorImpl.FinishLock#threadFinished
// Each helper thread invokes this function on finishLock, when
// the thread is done with poll().
private synchronized void threadFinished() {
    if (threadsToFinish == threads.size()) { // finished poll() first
        // if finished first, wakeup others
        wakeup();
    }
    threadsToFinish--;
    if (threadsToFinish == 0) // all helper threads finished poll().
        notify();             // notify the main thread
}

//sun.nio.ch.WindowsSelectorImpl#wakeup
@Override
public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            setWakeupSocket();
            interruptTriggered = true;
        }
    }
    return this;
}
//sun.nio.ch.WindowsSelectorImpl#setWakeupSocket
// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
    setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte, 1, 0);
}

poll in subSelector

  1. subSelector.poll() is the core of select and implemented by native function poll0. It uses pollWrapper.pollArrayAddress as parameter for poll0. readFds,writeFds and exceptFds arrays are used to save the result of select from underlying. The first index of array is used to save the number of sockets, the rest part is used to save fd of socket. From the following codes, we can know:

    poll0 will listen pollWrapper's FD whether there's any data input or output. There will be IO blocking until IO operations happened. Since pollWrapper also saved ServerSocketChannel's FD, so ClientSocket just need to send data to ServerSocket, poll0 will return. However, since pollWrapper also save FD from write side in pipe. If write send data to FD, it will also make poll0 return. If both of them don't happen, it will make poll0() always blocked. If any one of them happened, selector.select will return. We need to use while (true){} at run() in SelectThread, this can make sure afterselector receive data and handler, it can still listen poll().

So we can know, NIO is still a kind of IO with Block mode. So what's difference between NIO and BIO? Actually, its difference is the position of blocking. BIO is blocked at read(recvfrom), but NIO is blocked at select. So what can we benefit from that? If we just change the position which it will be blocked, there will be no change for that. However, the core to solve this is the implementation of Epoll. It takes call back function and just let us know which part of data are ready by listening socket. We just need to handle the data on these threads. If we take this in BIO and assume there're 1000 connections, then there will be 1000 connections blocked at read. If we take NIO, we just need one thread to handle this. It utilize select's round robin strategy and combine with epoll and red black tree data structure to lower the cost of performance and reduce the code of switching context.

//sun.nio.ch.WindowsSelectorImpl.SubSelector
private final class SubSelector {
        private final int pollArrayIndex; // starting index in pollArray to poll
        // These arrays will hold result of native select().
        // The first element of each array is the number of selected sockets.
        // Other elements are file descriptors of selected sockets.
        // 保存发生read的FD
        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
        // 保存发生write的FD
        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
        //保存发生except的FD
        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];

        private SubSelector() {
            this.pollArrayIndex = 0; // main thread
        }

        private SubSelector(int threadIndex) { // helper threads
            this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
        }

        private int poll() throws IOException{ // poll for the main thread
            return poll0(pollWrapper.pollArrayAddress,
                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
                         readFds, writeFds, exceptFds, timeout);
        }

        private int poll(int index) throws IOException {
            // poll for helper threads
            return  poll0(pollWrapper.pollArrayAddress +
                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
                     Math.min(MAX_SELECTABLE_FDS,
                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
                     readFds, writeFds, exceptFds, timeout);
        }

        private native int poll0(long pollAddress, int numfds,
             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
             ...
}

updateSelectedKeys

  1. updateSelectedKeys(action) at WindowsSelectorImpl#doSelect <5> is used to handler every ready channel.
  • if key in Channel haven't been added into selectedKeys, then it will be added into selectedKeys.
  • If key have been added into selectedKeys which means ReadyOps which is supported by this channel contains at least one operation. This can be checked by (ski.nioReadyOps() & ski.nioInterestOps()) != 0. Then we need to change ReadyOps to what we do currently. Consumer<SelectionKey> is also at this position. In the below codes,any ready information recorded in ReadyOps will be discarded before invoking action and set directly.
//sun.nio.ch.WindowsSelectorImpl#updateSelectedKeys
private int updateSelectedKeys(Consumer<SelectionKey> action) {
    updateCount++;
    int numKeysUpdated = 0;
    numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
    for (SelectThread t: threads) {
        numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
    }
    return numKeysUpdated;
}
//sun.nio.ch.SelectorImpl#processReadyEvents
protected final int processReadyEvents(int rOps,
                                        SelectionKeyImpl ski,
                                        Consumer<SelectionKey> action) {
    if (action != null) {
        ski.translateAndSetReadyOps(rOps);
        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
            action.accept(ski);
            ensureOpen();
            return 1;
        }
    } else {
        assert Thread.holdsLock(publicSelectedKeys);
        if (selectedKeys.contains(ski)) {
            if (ski.translateAndUpdateReadyOps(rOps)) {
                return 1;
            }
        } else {
            ski.translateAndSetReadyOps(rOps);
            if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                selectedKeys.add(ski);
                return 1;
            }
        }
    }
    return 0;
}
//sun.nio.ch.WindowsSelectorImpl.SubSelector#processSelectedKeys
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
    int numKeysUpdated = 0;
    numKeysUpdated += processFDSet(updateCount, action, readFds,
                                    Net.POLLIN,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, writeFds,
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    false);
    numKeysUpdated += processFDSet(updateCount, action, exceptFds,
                                    Net.POLLIN |
                                    Net.POLLCONN |
                                    Net.POLLOUT,
                                    true);
    return numKeysUpdated;
}

    /**
    * sun.nio.ch.WindowsSelectorImpl.SubSelector#processFDSet
    * updateCount is used to tell if a key has been counted as updated
    * in this select operation.
    *
    * me.updateCount <= updateCount
    */
private int processFDSet(long updateCount,
                            Consumer<SelectionKey> action,
                            int[] fds, int rOps,
                            boolean isExceptFds)
{
    int numKeysUpdated = 0;
    for (int i = 1; i <= fds[0]; i++) {
        int desc = fds[i];
        if (desc == wakeupSourceFd) {
            synchronized (interruptLock) {
                interruptTriggered = true;
            }
            continue;
        }
        MapEntry me = fdMap.get(desc);
        // If me is null, the key was deregistered in the previous
        // processDeregisterQueue.
        if (me == null)
            continue;
        SelectionKeyImpl sk = me.ski;

        // The descriptor may be in the exceptfds set because there is
        // OOB data queued to the socket. If there is OOB data then it
        // is discarded and the key is not added to the selected set.
        if (isExceptFds &&
            (sk.channel() instanceof SocketChannelImpl) &&
            discardUrgentData(desc))
        {
            continue;
        }
        //我们应该关注的
        int updated = processReadyEvents(rOps, sk, action);
        if (updated > 0 && me.updateCount != updateCount) {
            me.updateCount = updateCount;
            numKeysUpdated++;
        }
    }
    return numKeysUpdated;
}

So content for Selector has been finished. In next article, we will talk about Java NIO Buffer.